Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support tiflash backup and restore during volume snapshot #4812

Merged
merged 8 commits into from
Jan 8, 2023
2 changes: 1 addition & 1 deletion cmd/backup-manager/app/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (bo *Options) backupData(
)
localCSBFile := path.Join(util.BRBinPath, "csb_backup.json")
// read cluster meta from external storage and pass it to BR
klog.Infof("read the restore meta from external storage")
klog.Infof("read the cluster meta from external storage")
externalStorage, err := pkgutil.NewStorageBackend(backup.Spec.StorageProvider, &pkgutil.StorageCredential{})
if err != nil {
return err
Expand Down
6 changes: 3 additions & 3 deletions cmd/backup-manager/app/clean/clean.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (bo *Options) deleteSnapshotsAndBackupMeta(ctx context.Context, backup *v1a
return err
}

metaInfo := &util.EBSBasedBRMeta{}
metaInfo := &bkutil.EBSBasedBRMeta{}
if err = json.Unmarshal(contents, metaInfo); err != nil {
klog.Errorf("rclone copy remote backupmeta to local failure.")
return err
Expand All @@ -112,7 +112,7 @@ func (bo *Options) deleteSnapshotsAndBackupMeta(ctx context.Context, backup *v1a
return nil
}

func (bo *Options) deleteVolumeSnapshots(meta *util.EBSBasedBRMeta) error {
func (bo *Options) deleteVolumeSnapshots(meta *bkutil.EBSBasedBRMeta) error {
newVolumeIDMap := make(map[string]string)
for i := range meta.TiKVComponent.Stores {
store := meta.TiKVComponent.Stores[i]
Expand All @@ -122,7 +122,7 @@ func (bo *Options) deleteVolumeSnapshots(meta *util.EBSBasedBRMeta) error {
}
}

ec2Session, err := util.NewEC2Session(CloudAPIConcurrency)
ec2Session, err := bkutil.NewEC2Session(CloudAPIConcurrency)
if err != nil {
klog.Errorf("new a ec2 session failure.")
return err
Expand Down
1 change: 1 addition & 0 deletions pkg/backup/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,5 @@ const (
LocalTmp = "/tmp"
ClusterBackupMeta = "clustermeta"
ClusterRestoreMeta = "restoremeta"
MetaFile = "backupmeta"
)
55 changes: 51 additions & 4 deletions pkg/backup/restore/restore_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,19 @@ func (rm *restoreManager) syncRestoreJob(restore *v1alpha1.Restore) error {
}

if restore.Spec.BR != nil && restore.Spec.Mode == v1alpha1.RestoreModeVolumeSnapshot {
// restore based snapshot for cloud provider
reason, err := rm.tryRestoreIfCanSnapshot(restore, tc)
err = rm.validateRestore(restore, tc)

if err != nil {
rm.statusUpdater.Update(restore, &v1alpha1.RestoreCondition{
Type: v1alpha1.RestoreInvalid,
Status: corev1.ConditionTrue,
Reason: "InvalidSpec",
Message: err.Error(),
}, nil)
return err
}
// restore based volume snapshot for cloud provider
reason, err := rm.volSnapshotRestore(restore, tc)
if err != nil {
rm.statusUpdater.Update(restore, &v1alpha1.RestoreCondition{
Type: v1alpha1.RestoreRetryFailed,
Expand Down Expand Up @@ -206,7 +217,7 @@ func (rm *restoreManager) readRestoreMetaFromExternalStorage(r *v1alpha1.Restore
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Minute*1))
defer cancel()

// write a file into external storage
// read restore meta from output of BR 1st restore
klog.Infof("read the restore meta from external storage")
cred := backuputil.GetStorageCredential(r.Namespace, r.Spec.StorageProvider, rm.deps.SecretLister)
externalStorage, err := backuputil.NewStorageBackend(r.Spec.StorageProvider, cred)
Expand Down Expand Up @@ -236,8 +247,44 @@ func (rm *restoreManager) readRestoreMetaFromExternalStorage(r *v1alpha1.Restore

return csb, "", nil
}
func (rm *restoreManager) validateRestore(r *v1alpha1.Restore, tc *v1alpha1.TidbCluster) error {
// check tiflash replicas
replicas, reason, err := rm.readTiFlashReplicasFromBackupMeta(r)
if err != nil {
klog.Errorf("read tiflash replica failure with reason %s", reason)
return err
}

if tc.Spec.TiFlash == nil {
if replicas != 0 {
klog.Errorf("tiflash is not configured, backupmeta has %d tiflash", replicas)
return fmt.Errorf("tiflash replica missmatched")
}

} else {
if tc.Spec.TiFlash.Replicas != replicas {
klog.Errorf("cluster has %d tiflash configured, backupmeta has %d tiflash", tc.Spec.TiFlash.Replicas, replicas)
return fmt.Errorf("tiflash replica missmatched")
}
}

return nil
}

func (rm *restoreManager) readTiFlashReplicasFromBackupMeta(r *v1alpha1.Restore) (int32, string, error) {
metaInfo, err := backuputil.GetVolSnapBackupMetaData(r, rm.deps.SecretLister)
if err != nil {
return 0, "GetVolSnapBackupMetaData failed", err
}

if metaInfo.KubernetesMeta.TiDBCluster.Spec.TiFlash == nil {
return 0, "", nil
}

return metaInfo.KubernetesMeta.TiDBCluster.Spec.TiFlash.Replicas, "", nil
}

func (rm *restoreManager) tryRestoreIfCanSnapshot(r *v1alpha1.Restore, tc *v1alpha1.TidbCluster) (string, error) {
func (rm *restoreManager) volSnapshotRestore(r *v1alpha1.Restore, tc *v1alpha1.TidbCluster) (string, error) {
if v1alpha1.IsRestoreComplete(r) {
return "", nil
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/backup/restore/restore_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,9 +424,16 @@ func TestBRRestoreByEBS(t *testing.T) {
//generate the restore meta in local nfs
err := os.WriteFile("/tmp/restoremeta", []byte(testutils.ConstructRestoreMetaStr()), 0644) //nolint:gosec
g.Expect(err).To(Succeed())

//generate the backup meta in local nfs, tiflash check need backupmeta to validation
err = os.WriteFile("/tmp/backupmeta", []byte(testutils.ConstructRestoreMetaStr()), 0644) //nolint:gosec
g.Expect(err).To(Succeed())
defer func() {
err = os.Remove("/tmp/restoremeta")
g.Expect(err).To(Succeed())

err = os.Remove("/tmp/backupmeta")
g.Expect(err).To(Succeed())
}()

for _, tt := range cases {
Expand Down
4 changes: 0 additions & 4 deletions pkg/backup/snapshotter/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,10 +536,6 @@ func resetVolumeBindingInfo(pvc *corev1.PersistentVolumeClaim, pv *corev1.Persis
pv.Spec.ClaimRef.ResourceVersion = ""
pv.Spec.ClaimRef.UID = ""
}

// Remove the provisioned-by annotation which signals that the persistent
// volume was dynamically provisioned; it is now statically provisioned.
delete(pv.Annotations, constants.KubeAnnDynamicallyProvisioned)
}

func resetMetadataAndStatus(
Expand Down
9 changes: 6 additions & 3 deletions pkg/backup/snapshotter/snapshotter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1146,7 +1146,8 @@ func TestProcessCSBPVCsAndPVs(t *testing.T) {
"test/label": "retained",
},
Annotations: map[string]string{
"test/annotation": "retained",
constants.KubeAnnDynamicallyProvisioned: "ebs.csi.aws.com",
"test/annotation": "retained",
},
},
Spec: corev1.PersistentVolumeSpec{
Expand All @@ -1169,7 +1170,8 @@ func TestProcessCSBPVCsAndPVs(t *testing.T) {
"test/label": "retained",
},
Annotations: map[string]string{
"test/annotation": "retained",
constants.KubeAnnDynamicallyProvisioned: "ebs.csi.aws.com",
"test/annotation": "retained",
},
},
Spec: corev1.PersistentVolumeSpec{
Expand All @@ -1192,7 +1194,8 @@ func TestProcessCSBPVCsAndPVs(t *testing.T) {
"test/label": "retained",
},
Annotations: map[string]string{
"test/annotation": "retained",
constants.KubeAnnDynamicallyProvisioned: "ebs.csi.aws.com",
"test/annotation": "retained",
},
},
Spec: corev1.PersistentVolumeSpec{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ import (
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/pingcap/errors"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/klog/v2"
)

Expand Down Expand Up @@ -63,11 +66,11 @@ type ClusterInfo struct {
Replicas map[string]uint64 `json:"replicas" toml:"replicas"`
}

type Kubernetes struct {
PVs []interface{} `json:"pvs" toml:"pvs"`
PVCs []interface{} `json:"pvcs" toml:"pvcs"`
CRD interface{} `json:"crd_tidb_cluster" toml:"crd_tidb_cluster"`
Options map[string]interface{} `json:"options" toml:"options"`
type KubernetesBackup struct {
PVCs []*corev1.PersistentVolumeClaim `json:"pvcs"`
PVs []*corev1.PersistentVolume `json:"pvs"`
TiDBCluster *v1alpha1.TidbCluster `json:"crd_tidb_cluster"`
Unstructured *unstructured.Unstructured `json:"options"`
}

type TiKVComponent struct {
Expand All @@ -88,7 +91,7 @@ type EBSBasedBRMeta struct {
TiKVComponent *TiKVComponent `json:"tikv" toml:"tikv"`
TiDBComponent *TiDBComponent `json:"tidb" toml:"tidb"`
PDComponent *PDComponent `json:"pd" toml:"pd"`
KubernetesMeta *Kubernetes `json:"kubernetes" toml:"kubernetes"`
KubernetesMeta *KubernetesBackup `json:"kubernetes" toml:"kubernetes"`
Options map[string]interface{} `json:"options" toml:"options"`
Region string `json:"region" toml:"region"`
}
Expand Down
57 changes: 57 additions & 0 deletions pkg/backup/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,23 @@
package util

import (
"context"
"encoding/json"
"fmt"
"net/url"
"path"
"strings"
"time"
"unsafe"

"github.com/Masterminds/semver"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/apis/util/config"
"github.com/pingcap/tidb-operator/pkg/backup/constants"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
corelisterv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
)

Expand Down Expand Up @@ -732,3 +737,55 @@ func GetOptions(provider v1alpha1.StorageProvider) []string {
return nil
}
}

// getVolSnapBackupMetaData get backup metadata from cloud storage
func GetVolSnapBackupMetaData(r *v1alpha1.Restore, secretLister corelisterv1.SecretLister) (*EBSBasedBRMeta, error) {
// since the restore meta is small (~5M), assume 1 minutes is enough
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Minute*1))
defer cancel()

klog.Infof("read the backup meta from external storage")
cred := GetStorageCredential(r.Namespace, r.Spec.StorageProvider, secretLister)
s, err := NewStorageBackend(r.Spec.StorageProvider, cred)
if err != nil {
return nil, err
}
defer s.Close()

var metaInfo []byte
// use exponential backoff, every retry duration is duration * factor ^ (used_step - 1)
backoff := wait.Backoff{
Duration: time.Second,
Steps: 6,
Factor: 2.0,
Cap: time.Minute,
}
readBackupMeta := func() error {
exist, err := s.Exists(ctx, constants.MetaFile)
if err != nil {
return err
}
if !exist {
return fmt.Errorf("%s not exist", constants.MetaFile)
}
metaInfo, err = s.ReadAll(ctx, constants.MetaFile)
if err != nil {
return err
}
return nil
}
isRetry := func(err error) bool {
return !strings.Contains(err.Error(), "not exist")
}
err = retry.OnError(backoff, isRetry, readBackupMeta)
if err != nil {
return nil, fmt.Errorf("read backup meta from bucket %s and prefix %s, err: %v", s.GetBucket(), s.GetPrefix(), err)
}

backupMeta := &EBSBasedBRMeta{}
err = json.Unmarshal(metaInfo, backupMeta)
if err != nil {
return nil, fmt.Errorf("unmarshal backup meta from bucket %s and prefix %s, err: %v", s.GetBucket(), s.GetPrefix(), err)
}
return backupMeta, nil
}
39 changes: 39 additions & 0 deletions pkg/manager/member/tiflash_member_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ func (m *tiflashMemberManager) Sync(tc *v1alpha1.TidbCluster) error {
return nil
}

if err := m.syncRecoveryForTiFlash(tc); err != nil {
klog.Info("sync recovery for TiFlash", err.Error())
return nil
}

err = m.enablePlacementRules(tc)
if err != nil {
klog.Errorf("Enable placement rules failed, error: %v", err)
Expand All @@ -104,6 +109,40 @@ func (m *tiflashMemberManager) Sync(tc *v1alpha1.TidbCluster) error {
return m.syncStatefulSet(tc)
}

func (m *tiflashMemberManager) syncRecoveryForTiFlash(tc *v1alpha1.TidbCluster) error {
// Check whether the cluster is in recovery mode
// and whether the volumes have been restored for TiKV
if !tc.Spec.RecoveryMode {
return nil
}

ns := tc.GetNamespace()
tcName := tc.GetName()
anns := tc.GetAnnotations()
if _, ok := anns[label.AnnTiKVVolumesReadyKey]; !ok {
return controller.RequeueErrorf("TidbCluster: [%s/%s], TiFlash is waiting for TiKV volumes ready", ns, tcName)
}

if mark, _ := m.checkRecoveringMark(tc); mark {
return controller.RequeueErrorf("TidbCluster: [%s/%s], TiFlash is waiting for recovery mode unmask", ns, tcName)
}

return nil
}

// check the recovering mark from pd
// volume-snapshot restore requires pd allocate id set done and then start tiflash. the purpose is to solve tiflash store id conflict with tikvs'
// pd recovering mark indicates the pd allcate id had been set properly.
func (m *tiflashMemberManager) checkRecoveringMark(tc *v1alpha1.TidbCluster) (bool, error) {
pdCli := controller.GetPDClient(m.deps.PDControl, tc)
mark, err := pdCli.GetRecoveringMark()
if err != nil {
return false, err
}

return mark, nil
}

func (m *tiflashMemberManager) enablePlacementRules(tc *v1alpha1.TidbCluster) error {
pdCli := controller.GetPDClient(m.deps.PDControl, tc)
config, err := pdCli.GetConfig()
Expand Down
11 changes: 11 additions & 0 deletions pkg/pdapi/fake_pdapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const (
GetPDLeaderActionType ActionType = "GetPDLeader"
TransferPDLeaderActionType ActionType = "TransferPDLeader"
GetAutoscalingPlansActionType ActionType = "GetAutoscalingPlans"
GetRecoveringMarkActionType ActionType = "GetRecoveringMark"
)

type NotFoundReaction struct {
Expand Down Expand Up @@ -270,3 +271,13 @@ func (c *FakePDClient) GetAutoscalingPlans(strategy Strategy) ([]Plan, error) {
}
return nil, nil
}

func (c *FakePDClient) GetRecoveringMark() (bool, error) {
action := &Action{}
_, err := c.fakeAPI(GetRecoveringMarkActionType, action)
if err != nil {
return false, err
}

return true, nil
}
Loading