Skip to content

Commit

Permalink
Fix backup data compare logic (#454)
Browse files Browse the repository at this point in the history
* fix data compare logic

* add test to binlog backup

* move backup to tests
  • Loading branch information
weekface authored and tennix committed May 6, 2019
1 parent eebd686 commit 3adc027
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 187 deletions.
88 changes: 59 additions & 29 deletions tests/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ type OperatorActions interface {
RegisterWebHookAndServiceOrDie(info *OperatorConfig)
CleanWebHookAndService(info *OperatorConfig) error
StartValidatingAdmissionWebhookServerOrDie(info *OperatorConfig)
BackupRestore(from, to *TidbClusterConfig) error
BackupRestoreOrDie(from, to *TidbClusterConfig)
}

type operatorActions struct {
Expand Down Expand Up @@ -1537,24 +1539,15 @@ func (oa *operatorActions) CheckRestore(from *TidbClusterConfig, to *TidbCluster
return false, nil
}

fromCount, err := from.QueryCount()
b, err := to.DataIsTheSameAs(from)
if err != nil {
glog.Errorf("cluster [%s] count err ", from.ClusterName)
glog.Error(err)
return false, nil
}

toCount, err := to.QueryCount()
if err != nil {
glog.Errorf("cluster [%s] count err ", to.ClusterName)
return false, nil
}

if fromCount != toCount {
glog.Errorf("cluster [%s] count %d cluster [%s] count %d is not equal ",
from.ClusterName, fromCount, to.ClusterName, toCount)
return false, nil
if b {
return true, nil
}
return true, nil
return false, nil
}

err := wait.Poll(oa.pollInterval, BackupAndRestorePollTimeOut, fn)
Expand All @@ -1576,29 +1569,65 @@ func (oa *operatorActions) ForceDeploy(info *TidbClusterConfig) error {
return nil
}

func (info *TidbClusterConfig) QueryCount() (int, error) {
tableName := "test"
db, err := sql.Open("mysql", getDSN(info.Namespace, info.ClusterName, "record", info.Password))
func (info *TidbClusterConfig) DataIsTheSameAs(otherInfo *TidbClusterConfig) (bool, error) {
tableNum := otherInfo.BlockWriteConfig.TableNum

infoDb, err := sql.Open("mysql", getDSN(info.Namespace, info.ClusterName, "test", info.Password))
if err != nil {
return 0, err
return false, err
}
defer db.Close()

rows, err := db.Query(fmt.Sprintf("SELECT count(*) FROM %s", tableName))
defer infoDb.Close()
otherInfoDb, err := sql.Open("mysql", getDSN(otherInfo.Namespace, otherInfo.ClusterName, "test", otherInfo.Password))
if err != nil {
glog.Infof("cluster:[%s], error: %v", info.ClusterName, err)
return 0, err
return false, err
}
defer otherInfoDb.Close()

getCntFn := func(db *sql.DB, tableName string) (int, error) {
var cnt int
rows, err := db.Query(fmt.Sprintf("SELECT count(*) FROM %s", tableName))
if err != nil {
return cnt, fmt.Errorf("failed to select count(*) from %s, %v", tableName, err)
}
for rows.Next() {
err := rows.Scan(&cnt)
if err != nil {
return cnt, fmt.Errorf("failed to scan count from %s, %v", tableName, err)
}
return cnt, nil
}
return cnt, fmt.Errorf("can not find count of table %s", tableName)
}

for rows.Next() {
var count int
err := rows.Scan(&count)
for i := 0; i < tableNum; i++ {
var tableName string
if i == 0 {
tableName = "block_writer"
} else {
tableName = fmt.Sprintf("block_writer%d", i)
}

cnt, err := getCntFn(infoDb, tableName)
if err != nil {
glog.Infof("cluster:[%s], error :%v", info.ClusterName, err)
return false, err
}
return count, nil
otherCnt, err := getCntFn(otherInfoDb, tableName)
if err != nil {
return false, err
}

if cnt != otherCnt {
err := fmt.Errorf("cluster %s/%s's table %s count(*) = %d and cluster %s/%s's table %s count(*) = %d",
info.Namespace, info.ClusterName, tableName, cnt,
otherInfo.Namespace, otherInfo.ClusterName, tableName, otherCnt)
return false, err
}
glog.Infof("cluster %s/%s's table %s count(*) = %d and cluster %s/%s's table %s count(*) = %d",
info.Namespace, info.ClusterName, tableName, cnt,
otherInfo.Namespace, otherInfo.ClusterName, tableName, otherCnt)
}
return 0, fmt.Errorf("can not find count of ")

return true, nil
}

func (oa *operatorActions) CreateSecret(info *TidbClusterConfig) error {
Expand Down Expand Up @@ -1875,6 +1904,7 @@ func (oa *operatorActions) DeployIncrementalBackup(from *TidbClusterConfig, to *
"binlog.drainer.mysql.user": "root",
"binlog.drainer.mysql.password": to.Password,
"binlog.drainer.mysql.port": "4000",
"binlog.drainer.ignoreSchemas": "\"INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql\"",
}

setString := from.TidbClusterHelmSetString(sets)
Expand Down
99 changes: 99 additions & 0 deletions tests/backup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package tests

import (
"time"

"github.com/golang/glog"
"github.com/pingcap/tidb-operator/tests/slack"
"k8s.io/apimachinery/pkg/util/wait"
)

func (oa *operatorActions) BackupRestore(from, to *TidbClusterConfig) error {
oa.StopInsertDataTo(from)

err := oa.DeployAdHocBackup(from)
if err != nil {
glog.Errorf("cluster:[%s] deploy happen error: %v", from.ClusterName, err)
return err
}

err = oa.CheckAdHocBackup(from)
if err != nil {
glog.Errorf("cluster:[%s] deploy happen error: %v", from.ClusterName, err)
return err
}

err = oa.CheckTidbClusterStatus(to)
if err != nil {
glog.Errorf("cluster:[%s] deploy faild error: %v", to.ClusterName, err)
return err
}

err = oa.Restore(from, to)
if err != nil {
glog.Errorf("from cluster:[%s] to cluster [%s] restore happen error: %v",
from.ClusterName, to.ClusterName, err)
return err
}

err = oa.CheckRestore(from, to)
if err != nil {
glog.Errorf("from cluster:[%s] to cluster [%s] restore failed error: %v",
from.ClusterName, to.ClusterName, err)
return err
}

go oa.BeginInsertDataToOrDie(from)
err = oa.DeployScheduledBackup(from)
if err != nil {
glog.Errorf("cluster:[%s] scheduler happen error: %v", from.ClusterName, err)
return err
}

err = oa.CheckScheduledBackup(from)
if err != nil {
glog.Errorf("cluster:[%s] scheduler failed error: %v", from.ClusterName, err)
return err
}

err = oa.DeployIncrementalBackup(from, to)
if err != nil {
return err
}

err = oa.CheckIncrementalBackup(from)
if err != nil {
return err
}

glog.Infof("waiting 1 minutes for binlog to work")
time.Sleep(1 * time.Minute)

glog.Infof("cluster[%s] begin insert data", from.ClusterName)
go oa.BeginInsertDataTo(from)

time.Sleep(5 * time.Minute)

glog.Infof("cluster[%s] stop insert data", from.ClusterName)
oa.StopInsertDataTo(from)

fn := func() (bool, error) {
b, err := to.DataIsTheSameAs(from)
if err != nil {
glog.Error(err)
return false, nil
}
if b {
return true, nil
}
return false, nil
}

return wait.Poll(DefaultPollInterval, DefaultPollTimeout, fn)
}

func (oa *operatorActions) BackupRestoreOrDie(from, to *TidbClusterConfig) {
if err := oa.BackupRestore(from, to); err != nil {
slack.NotifyAndPanic(err)
}
}
1 change: 0 additions & 1 deletion tests/backup/backup.go

This file was deleted.

148 changes: 0 additions & 148 deletions tests/backup/backupcase.go

This file was deleted.

7 changes: 1 addition & 6 deletions tests/cmd/e2e/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"k8s.io/apiserver/pkg/util/logs"

"github.com/pingcap/tidb-operator/tests"
"github.com/pingcap/tidb-operator/tests/backup"
"github.com/pingcap/tidb-operator/tests/pkg/client"
)

Expand Down Expand Up @@ -245,11 +244,7 @@ func main() {
glog.Fatal(err)
}

backupCase := backup.NewBackupCase(oa, backupClusterInfo, restoreClusterInfo)

if err := backupCase.Run(); err != nil {
glog.Fatal(err)
}
oa.BackupRestoreOrDie(backupClusterInfo, restoreClusterInfo)

//clean temp dirs when e2e success
err = conf.CleanTempDirs()
Expand Down
Loading

0 comments on commit 3adc027

Please sign in to comment.