Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backup/Restore: support configuring TiKV GC life time #1835

Merged
merged 15 commits into from
Mar 4, 2020
Merged
10 changes: 2 additions & 8 deletions cmd/backup-manager/app/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,16 @@ import (
"os/exec"

"github.com/gogo/protobuf/proto"
"k8s.io/klog"

kvbackup "github.com/pingcap/kvproto/pkg/backup"
"github.com/pingcap/tidb-operator/cmd/backup-manager/app/constants"
"github.com/pingcap/tidb-operator/cmd/backup-manager/app/util"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"k8s.io/klog"
)

// Options contains the input arguments to the backup command
type Options struct {
Namespace string
BackupName string
}

func (bo *Options) String() string {
return fmt.Sprintf("%s/%s", bo.Namespace, bo.BackupName)
util.GenericOptions
}

func (bo *Options) backupData(backup *v1alpha1.Backup) (string, error) {
Expand Down
139 changes: 132 additions & 7 deletions cmd/backup-manager/app/backup/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,19 @@
package backup

import (
"database/sql"
"fmt"
"time"

"github.com/pingcap/tidb-operator/cmd/backup-manager/app/constants"
"github.com/pingcap/tidb-operator/cmd/backup-manager/app/util"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
bkconstants "github.com/pingcap/tidb-operator/pkg/backup/constants"
listers "github.com/pingcap/tidb-operator/pkg/client/listers/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/controller"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog"
)

Expand All @@ -44,11 +49,29 @@ func NewManager(
}
}

func (bm *Manager) setOptions(backup *v1alpha1.Backup) {
bm.Options.Host = backup.Spec.From.Host

if backup.Spec.From.Port != 0 {
bm.Options.Port = backup.Spec.From.Port
} else {
bm.Options.Port = bkconstants.DefaultTidbPort
}

if backup.Spec.From.User != "" {
bm.Options.User = backup.Spec.From.User
} else {
bm.Options.User = bkconstants.DefaultTidbUser
}

bm.Options.Password = util.GetOptionValueFromEnv(bkconstants.TidbPasswordKey, bkconstants.BackupManagerEnvVarPrefix)
}

// ProcessBackup used to process the backup logic
func (bm *Manager) ProcessBackup() error {
backup, err := bm.backupLister.Backups(bm.Namespace).Get(bm.BackupName)
backup, err := bm.backupLister.Backups(bm.Namespace).Get(bm.ResourceName)
if err != nil {
klog.Errorf("can't find cluster %s backup %s CRD object, err: %v", bm, bm.BackupName, err)
klog.Errorf("can't find cluster %s backup %s CRD object, err: %v", bm, bm.ResourceName, err)
return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupFailed,
Status: corev1.ConditionTrue,
Expand All @@ -60,10 +83,34 @@ func (bm *Manager) ProcessBackup() error {
if backup.Spec.BR == nil {
return fmt.Errorf("no br config in %s", bm)
}
return bm.performBackup(backup.DeepCopy())

bm.setOptions(backup)

var db *sql.DB
err = wait.PollImmediate(constants.PollInterval, constants.CheckTimeout, func() (done bool, err error) {
db, err = util.OpenDB(bm.GetDSN())
if err != nil {
klog.Warningf("can't connect to tidb cluster %s, err: %s", bm, err)
return false, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why return nil for the error case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://godoc.org/k8s.io/apimachinery/pkg/util/wait#ConditionFunc ConditionFunc returns true if the condition is satisfied, or an error if the loop should be aborted.

}
return true, nil
})

if err != nil {
klog.Errorf("cluster %s connect failed, err: %s", bm, err)
return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupFailed,
Status: corev1.ConditionTrue,
Reason: "ConnectTidbFailed",
Message: err.Error(),
})
}

defer db.Close()
return bm.performBackup(backup.DeepCopy(), db)
}

func (bm *Manager) performBackup(backup *v1alpha1.Backup) error {
func (bm *Manager) performBackup(backup *v1alpha1.Backup, db *sql.DB) error {
started := time.Now()

err := bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Expand All @@ -74,16 +121,94 @@ func (bm *Manager) performBackup(backup *v1alpha1.Backup) error {
return err
}

backupFullPath, err := bm.backupData(backup)
oldTikvGCTime, err := bm.GetTikvGCLifeTime(db)
if err != nil {
klog.Errorf("backup cluster %s data failed, err: %s", bm, err)
klog.Errorf("cluster %s get %s failed, err: %s", bm, constants.TikvGCVariable, err)
return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupFailed,
Status: corev1.ConditionTrue,
Reason: "BackupDataToRemoteFailed",
Reason: "GetTikvGCLifeTimeFailed",
Message: err.Error(),
})
}
klog.Infof("cluster %s %s is %s", bm, constants.TikvGCVariable, oldTikvGCTime)

oldTikvGCTimeDuration, err := time.ParseDuration(oldTikvGCTime)
if err != nil {
klog.Errorf("cluster %s parse old %s failed, err: %s", bm, constants.TikvGCVariable, err)
return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupFailed,
Status: corev1.ConditionTrue,
Reason: "ParseOldTikvGCLifeTimeFailed",
Message: err.Error(),
})
}

var tikvGCTimeDuration time.Duration
var tikvGCLifeTime string
if backup.Spec.TikvGCLifeTime != nil {
tikvGCLifeTime = *backup.Spec.TikvGCLifeTime
tikvGCTimeDuration, err = time.ParseDuration(tikvGCLifeTime)
if err != nil {
klog.Errorf("cluster %s parse configured %s failed, err: %s", bm, constants.TikvGCVariable, err)
return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupFailed,
Status: corev1.ConditionTrue,
Reason: "ParseConfiguredTikvGCLifeTimeFailed",
Message: err.Error(),
})
}
} else {
tikvGCLifeTime = constants.TikvGCLifeTime
tikvGCTimeDuration, err = time.ParseDuration(tikvGCLifeTime)
if err != nil {
klog.Errorf("cluster %s parse default %s failed, err: %s", bm, constants.TikvGCVariable, err)
return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupFailed,
Status: corev1.ConditionTrue,
Reason: "ParseDefaultTikvGCLifeTimeFailed",
Message: err.Error(),
})
}
}

if oldTikvGCTimeDuration < tikvGCTimeDuration {
err = bm.SetTikvGCLifeTime(db, tikvGCLifeTime)
if err != nil {
klog.Errorf("cluster %s set tikv GC life time to %s failed, err: %s", bm, tikvGCLifeTime, err)
return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupFailed,
Status: corev1.ConditionTrue,
Reason: "SetTikvGCLifeTimeFailed",
Message: err.Error(),
})
}
klog.Infof("set cluster %s %s to %s success", bm, constants.TikvGCVariable, tikvGCLifeTime)
}

backupFullPath, backupErr := bm.backupData(backup)
if oldTikvGCTimeDuration < tikvGCTimeDuration {
err = bm.SetTikvGCLifeTime(db, oldTikvGCTime)
if err != nil {
klog.Errorf("cluster %s reset tikv GC life time to %s failed, err: %s", bm, oldTikvGCTime, err)
return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupFailed,
Status: corev1.ConditionTrue,
Reason: "ResetTikvGCLifeTimeFailed",
Message: err.Error(),
})
}
klog.Infof("reset cluster %s %s to %s success", bm, constants.TikvGCVariable, oldTikvGCTime)
}
if backupErr != nil {
klog.Errorf("backup cluster %s data failed, err: %s", bm, backupErr)
return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupFailed,
Status: corev1.ConditionTrue,
Reason: "BackupDataToRemoteFailed",
Message: backupErr.Error(),
})
}
klog.Infof("backup cluster %s data to %s success", bm, backupFullPath)

// Note: The size get from remote may be incorrect because the blobs
Expand Down
2 changes: 1 addition & 1 deletion cmd/backup-manager/app/cmd/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func NewBackupCommand() *cobra.Command {
}

cmd.Flags().StringVar(&bo.Namespace, "namespace", "", "Backup CR's namespace")
cmd.Flags().StringVar(&bo.BackupName, "backupName", "", "Backup CRD object name")
cmd.Flags().StringVar(&bo.ResourceName, "backupName", "", "Backup CRD object name")
return cmd
}

Expand Down
12 changes: 3 additions & 9 deletions cmd/backup-manager/app/cmd/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/pingcap/tidb-operator/cmd/backup-manager/app/constants"
"github.com/pingcap/tidb-operator/cmd/backup-manager/app/export"
"github.com/pingcap/tidb-operator/cmd/backup-manager/app/util"
bkconstants "github.com/pingcap/tidb-operator/pkg/backup/constants"
informers "github.com/pingcap/tidb-operator/pkg/client/informers/externalversions"
"github.com/pingcap/tidb-operator/pkg/controller"
"github.com/spf13/cobra"
Expand All @@ -32,7 +31,7 @@ import (

// NewExportCommand implements the backup command
func NewExportCommand() *cobra.Command {
bo := export.BackupOpts{}
bo := export.Options{}

cmd := &cobra.Command{
Use: "export",
Expand All @@ -44,18 +43,13 @@ func NewExportCommand() *cobra.Command {
}

cmd.Flags().StringVar(&bo.Namespace, "namespace", "", "Backup CR's namespace")
cmd.Flags().StringVar(&bo.Host, "host", "", "Tidb cluster access address")
cmd.Flags().Int32Var(&bo.Port, "port", bkconstants.DefaultTidbPort, "Port number to use for connecting tidb cluster")
cmd.Flags().StringVar(&bo.ResourceName, "backupName", "", "Backup CRD object name")
cmd.Flags().StringVar(&bo.Bucket, "bucket", "", "Bucket in which to store the backup data")
cmd.Flags().StringVar(&bo.Password, bkconstants.TidbPasswordKey, "", "Password to use when connecting to tidb cluster")
cmd.Flags().StringVar(&bo.User, "user", "", "User for login tidb cluster")
cmd.Flags().StringVar(&bo.StorageType, "storageType", "", "Backend storage type")
cmd.Flags().StringVar(&bo.BackupName, "backupName", "", "Backup CRD object name")
util.SetFlagsFromEnv(cmd.Flags(), bkconstants.BackupManagerEnvVarPrefix)
return cmd
}

func runExport(backupOpts export.BackupOpts, kubecfg string) error {
func runExport(backupOpts export.Options, kubecfg string) error {
kubeCli, cli, err := util.NewKubeAndCRCli(kubecfg)
cmdutil.CheckErr(err)
options := []informers.SharedInformerOption{
Expand Down
12 changes: 3 additions & 9 deletions cmd/backup-manager/app/cmd/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/pingcap/tidb-operator/cmd/backup-manager/app/constants"
"github.com/pingcap/tidb-operator/cmd/backup-manager/app/import"
"github.com/pingcap/tidb-operator/cmd/backup-manager/app/util"
bkconstants "github.com/pingcap/tidb-operator/pkg/backup/constants"
informers "github.com/pingcap/tidb-operator/pkg/client/informers/externalversions"
"github.com/pingcap/tidb-operator/pkg/controller"
"github.com/spf13/cobra"
Expand All @@ -32,7 +31,7 @@ import (

// NewImportCommand implements the restore command
func NewImportCommand() *cobra.Command {
ro := _import.RestoreOpts{}
ro := _import.Options{}

cmd := &cobra.Command{
Use: "import",
Expand All @@ -44,17 +43,12 @@ func NewImportCommand() *cobra.Command {
}

cmd.Flags().StringVar(&ro.Namespace, "namespace", "", "Restore CR's namespace")
cmd.Flags().StringVar(&ro.Host, "host", "", "Tidb cluster access address")
cmd.Flags().Int32Var(&ro.Port, "port", bkconstants.DefaultTidbPort, "Port number to use for connecting tidb cluster")
cmd.Flags().StringVar(&ro.Password, bkconstants.TidbPasswordKey, "", "Password to use when connecting to tidb cluster")
cmd.Flags().StringVar(&ro.User, "user", "", "User for login tidb cluster")
cmd.Flags().StringVar(&ro.RestoreName, "restoreName", "", "Restore CRD object name")
cmd.Flags().StringVar(&ro.ResourceName, "restoreName", "", "Restore CRD object name")
cmd.Flags().StringVar(&ro.BackupPath, "backupPath", "", "The location of the backup")
util.SetFlagsFromEnv(cmd.Flags(), bkconstants.BackupManagerEnvVarPrefix)
return cmd
}

func runImport(restoreOpts _import.RestoreOpts, kubecfg string) error {
func runImport(restoreOpts _import.Options, kubecfg string) error {
kubeCli, cli, err := util.NewKubeAndCRCli(kubecfg)
cmdutil.CheckErr(err)
options := []informers.SharedInformerOption{
Expand Down
4 changes: 1 addition & 3 deletions cmd/backup-manager/app/cmd/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/pingcap/tidb-operator/cmd/backup-manager/app/constants"
"github.com/pingcap/tidb-operator/cmd/backup-manager/app/restore"
"github.com/pingcap/tidb-operator/cmd/backup-manager/app/util"
bkconstants "github.com/pingcap/tidb-operator/pkg/backup/constants"
informers "github.com/pingcap/tidb-operator/pkg/client/informers/externalversions"
"github.com/pingcap/tidb-operator/pkg/controller"
"github.com/spf13/cobra"
Expand All @@ -44,8 +43,7 @@ func NewRestoreCommand() *cobra.Command {
}

cmd.Flags().StringVar(&ro.Namespace, "namespace", "", "Restore CR's namespace")
cmd.Flags().StringVar(&ro.RestoreName, "restoreName", "", "Restore CRD object name")
util.SetFlagsFromEnv(cmd.Flags(), bkconstants.BackupManagerEnvVarPrefix)
cmd.Flags().StringVar(&ro.ResourceName, "restoreName", "", "Restore CRD object name")
return cmd
}

Expand Down
Loading