Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add binlog deploy and check process #329

Merged
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 175 additions & 2 deletions tests/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ const (
DefaultPollTimeout time.Duration = 10 * time.Minute
DefaultPollInterval time.Duration = 10 * time.Second
getBackupDirPodName = "get-backup-dir"
grafanaUsername = "admin"
grafanaPassword = "admin"
grafanaUsername = "admin"
grafanaPassword = "admin"
)

type OperatorActions interface {
Expand Down Expand Up @@ -1475,9 +1475,182 @@ func (info *TidbClusterInfo) FullName() string {
}

func (oa *operatorActions) DeployIncrementalBackup(from *TidbClusterInfo, to *TidbClusterInfo) error {
glog.Infof("begin to deploy incremental backup cluster[%s] namespace[%s]", from.ClusterName, from.Namespace)
defer func() {
glog.Infof("deploy incremental backup end cluster[%s] namespace[%s]", to.ClusterName, to.Namespace)
}()
sets := map[string]string{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merge these with from sets

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

"binlog.pump.create": "true",
"binlog.drainer.destDBType": "mysql",
"binlog.drainer.create": "true",
"binlog.drainer.mysql.host": fmt.Sprintf("%s-tidb.%s", to.ClusterName, to.Namespace),
"binlog.drainer.mysql.user": "root",
"binlog.drainer.mysql.password": to.Password,
"binlog.drainer.mysql.port": "4000",
}

var buffer bytes.Buffer
for k, v := range sets {
set := fmt.Sprintf(" --set %s=%s", k, v)
_, err := buffer.WriteString(set)
if err != nil {
return err
}
}

setStr := buffer.String()
cmd := fmt.Sprintf("helm upgrade %s /charts/%s/tidb-cluster %s",
from.ClusterName, from.OperatorTag, setStr)
glog.Infof(cmd)
res, err := exec.Command("/bin/sh", "-c", cmd).CombinedOutput()
if err != nil {
return fmt.Errorf("failed to launch scheduler backup job: %v, %s", err, string(res))
}
return nil
}

func (oa *operatorActions) CheckIncrementalBackup(info *TidbClusterInfo) error {
glog.Infof("begin to check incremental backup cluster[%s] namespace[%s]", info.ClusterName, info.Namespace)
defer func() {
glog.Infof("check incremental backup end cluster[%s] namespace[%s]", info.ClusterName, info.Namespace)
}()

pumpStatefulSetName := fmt.Sprintf("%s-pump", info.ClusterName)
fn := func() (bool, error) {
pumpStatefulSet, err := oa.kubeCli.AppsV1().StatefulSets(info.Namespace).Get(pumpStatefulSetName, metav1.GetOptions{})
if err != nil {
glog.Errorf("failed to get jobs %s ,%v", pumpStatefulSetName, err)
return false, nil
}
if pumpStatefulSet.Status.Replicas != pumpStatefulSet.Status.ReadyReplicas {
glog.Errorf("pump replicas is not ready, please wait ! %s ", pumpStatefulSetName)
return false, nil
}

listOps := metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(
pumpStatefulSet.Labels,
).String(),
}

pods, err := oa.kubeCli.CoreV1().Pods(info.Namespace).List(listOps)
if err != nil {
glog.Errorf("failed to get pods via pump labels %s ,%v", pumpStatefulSetName, err)
return false, nil
}

for _, pod := range pods.Items {
if !oa.pumpHealth(info, pod.Spec.Hostname) {
glog.Errorf("some pods is not health %s ,%v", pumpStatefulSetName, err)
return false, nil
}
}

drainerStatefulSetName := fmt.Sprintf("%s-drainer", info.ClusterName)
drainerStatefulSet, err := oa.kubeCli.AppsV1().StatefulSets(info.Namespace).Get(drainerStatefulSetName, metav1.GetOptions{})
if err != nil {
glog.Errorf("failed to get jobs %s ,%v", pumpStatefulSetName, err)
return false, nil
}
if drainerStatefulSet.Status.Replicas != drainerStatefulSet.Status.ReadyReplicas {
glog.Errorf("drainer replicas is not ready, please wait ! %s ", pumpStatefulSetName)
return false, nil
}

listOps = metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(
drainerStatefulSet.Labels,
).String(),
}

pods, err = oa.kubeCli.CoreV1().Pods(info.Namespace).List(listOps)
if err != nil {
return false, nil
}
for _, pod := range pods.Items {
if !oa.drainerHealth(info, pod.Spec.Hostname) {
return false, nil
}
}

return true, nil
}

err := wait.Poll(DefaultPollInterval, DefaultPollTimeout, fn)
if err != nil {
return fmt.Errorf("failed to launch scheduler backup job: %v", err)
}
return nil

}

type pumpStatus struct {
StatusMap map[string]*nodeStatus
}

type nodeStatus struct {
State string `json:"state"`
}

func (oa *operatorActions) pumpHealth(info *TidbClusterInfo, hostName string) bool {
pumpHealthUrl := fmt.Sprintf("%s.%s-pump.%s:8250/status", hostName, info.ClusterName, info.Namespace)
res, err := http.Get(pumpHealthUrl)
if err != nil {
glog.Errorf("cluster:[%s] call %s failed,error:%v", info.ClusterName, pumpHealthUrl, err)
return false
}
if res.StatusCode >= 400 {
glog.Errorf("Error response %v", res.StatusCode)
return false
}
body, err := ioutil.ReadAll(res.Body)
if err != nil {
glog.Errorf("cluster:[%s] read response body failed,error:%v", info.ClusterName, err)
return false
}
healths := pumpStatus{}
err = json.Unmarshal(body, &healths)
if err != nil {
glog.Errorf("cluster:[%s] unmarshal failed,error:%v", info.ClusterName, err)
return false
}
for _, status := range healths.StatusMap {
if status.State != "online" {
glog.Errorf("cluster:[%s] pump's state is not online", info.ClusterName)
return false
}
}
return true
}

type drainerStatus struct {
PumpPos map[string]int64 `json:"PumpPos"`
Synced bool `json:"Synced"`
LastTS int64 `json:"LastTS"`
TsMap string `json:"TsMap"`
}

func (oa *operatorActions) drainerHealth(info *TidbClusterInfo, hostName string) bool {
drainerHealthUrl := fmt.Sprintf("%s.%s-drainer.%s:8249/status", hostName, info.ClusterName, info.Namespace)
res, err := http.Get(drainerHealthUrl)
if err != nil {
glog.Errorf("cluster:[%s] call %s failed,error:%v", info.ClusterName, drainerHealthUrl, err)
return false
}
if res.StatusCode >= 400 {
glog.Errorf("Error response %v", res.StatusCode)
return false
}
body, err := ioutil.ReadAll(res.Body)
if err != nil {
glog.Errorf("cluster:[%s] read response body failed,error:%v", info.ClusterName, err)
return false
}
healths := drainerStatus{}
err = json.Unmarshal(body, &healths)
if err != nil {
glog.Errorf("cluster:[%s] unmarshal failed,error:%v", info.ClusterName, err)
return false
}
return len(healths.PumpPos) > 0 && healths.Synced
}
38 changes: 38 additions & 0 deletions tests/backup/backupcase.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
package backup

import (
"fmt"
"time"

"github.com/golang/glog"
"github.com/pingcap/tidb-operator/tests"
)
Expand Down Expand Up @@ -78,5 +81,40 @@ func (bc *BackupCase) Run() error {
return err
}

err = bc.operator.DeployIncrementalBackup(bc.srcCluster, bc.desCluster)
if err != nil {
return err
}

err = bc.operator.CheckIncrementalBackup(bc.srcCluster)
if err != nil {
return err
}

glog.Infof("waiting 1 minutes for binlog to work")
time.Sleep(1 * time.Minute)

glog.Infof("cluster[%s] begin insert data")
go bc.operator.BeginInsertDataTo(bc.srcCluster)

time.Sleep(30 * time.Second)

glog.Infof("cluster[%s] stop insert data")
bc.operator.StopInsertDataTo(bc.srcCluster)

time.Sleep(5 * time.Second)

srcCount, err := bc.srcCluster.QueryCount()
if err != nil {
return err
}
desCount, err := bc.desCluster.QueryCount()
if err != nil {
return err
}
if srcCount != desCount {
return fmt.Errorf("cluster:[%s] the src cluster data[%d] is not equals des cluster data[%d]", bc.srcCluster.FullName(), srcCount, desCount)
}

return nil
}
40 changes: 20 additions & 20 deletions tests/cmd/e2e/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,26 +118,6 @@ func main() {
glog.Fatal(err)
}

clusterInfo = clusterInfo.ScaleTiDB(3)
if err := oa.ScaleTidbCluster(clusterInfo); err != nil {
oa.DumpAllLogs(operatorInfo, []*tests.TidbClusterInfo{clusterInfo})
glog.Fatal(err)
}
if err = oa.CheckTidbClusterStatus(clusterInfo); err != nil {
oa.DumpAllLogs(operatorInfo, []*tests.TidbClusterInfo{clusterInfo})
glog.Fatal(err)
}

clusterInfo = clusterInfo.UpgradeAll("v2.1.4")
if err = oa.UpgradeTidbCluster(clusterInfo); err != nil {
oa.DumpAllLogs(operatorInfo, []*tests.TidbClusterInfo{clusterInfo})
glog.Fatal(err)
}
if err = oa.CheckTidbClusterStatus(clusterInfo); err != nil {
oa.DumpAllLogs(operatorInfo, []*tests.TidbClusterInfo{clusterInfo})
glog.Fatal(err)
}

restoreClusterInfo := &tests.TidbClusterInfo{
BackupPVC: "test-backup",
Namespace: "tidb",
Expand Down Expand Up @@ -185,4 +165,24 @@ func main() {
oa.DumpAllLogs(operatorInfo, []*tests.TidbClusterInfo{clusterInfo, restoreClusterInfo})
glog.Fatal(err)
}

clusterInfo = clusterInfo.ScaleTiDB(3)
if err := oa.ScaleTidbCluster(clusterInfo); err != nil {
oa.DumpAllLogs(operatorInfo, []*tests.TidbClusterInfo{clusterInfo})
glog.Fatal(err)
}
if err = oa.CheckTidbClusterStatus(clusterInfo); err != nil {
oa.DumpAllLogs(operatorInfo, []*tests.TidbClusterInfo{clusterInfo})
glog.Fatal(err)
}

clusterInfo = clusterInfo.UpgradeAll("v2.1.4")
if err = oa.UpgradeTidbCluster(clusterInfo); err != nil {
oa.DumpAllLogs(operatorInfo, []*tests.TidbClusterInfo{clusterInfo})
glog.Fatal(err)
}
if err = oa.CheckTidbClusterStatus(clusterInfo); err != nil {
oa.DumpAllLogs(operatorInfo, []*tests.TidbClusterInfo{clusterInfo})
glog.Fatal(err)
}
}