diff --git a/cmd/backup-manager/app/backup/manager.go b/cmd/backup-manager/app/backup/manager.go index 99164c0917..df3b493af4 100644 --- a/cmd/backup-manager/app/backup/manager.go +++ b/cmd/backup-manager/app/backup/manager.go @@ -316,6 +316,27 @@ func (bm *Manager) performBackup(ctx context.Context, backup *v1alpha1.Backup, d // run br binary to do the real job backupErr := bm.backupData(ctx, backup, bm.StatusUpdater) + defer func() { + // Calculate the backup size for ebs backup job even if it fails + if bm.Mode == string(v1alpha1.BackupModeVolumeSnapshot) && !bm.Initialize { + backupSize, err := util.CalcVolSnapBackupSize(ctx, backup.Spec.StorageProvider) + if err != nil { + klog.Warningf("Failed to calc volume snapshot backup size %d bytes, %v", backupSize, err) + return + } + + backupSizeReadable := humanize.Bytes(uint64(backupSize)) + + updateStatus := &controller.BackupUpdateStatus{ + BackupSize: &backupSize, + BackupSizeReadable: &backupSizeReadable, + } + + bm.StatusUpdater.Update(backup, nil, + updateStatus) + } + }() + if db != nil && oldTikvGCTimeDuration < tikvGCTimeDuration { // use another context to revert `tikv_gc_life_time` back. // `DefaultTerminationGracePeriodSeconds` for a pod is 30, so we use a smaller timeout value here. @@ -328,6 +349,7 @@ func (bm *Manager) performBackup(ctx context.Context, backup *v1alpha1.Backup, d } errs = append(errs, err) klog.Errorf("cluster %s reset tikv GC life time to %s failed, err: %s", bm, oldTikvGCTime, err) + uerr := bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{ Type: v1alpha1.BackupFailed, Status: corev1.ConditionTrue, @@ -370,19 +392,9 @@ func (bm *Manager) performBackup(ctx context.Context, backup *v1alpha1.Backup, d completeCondition = v1alpha1.VolumeBackupComplete // In volume snapshot mode, commitTS have been updated according to the // br command output, so we don't need to update it here. - backupSize, err := util.CalcVolSnapBackupSize(ctx, backup.Spec.StorageProvider) - - if err != nil { - klog.Warningf("Failed to calc volume snapshot backup size %d bytes, %v", backupSize, err) - } - - backupSizeReadable := humanize.Bytes(uint64(backupSize)) - updateStatus = &controller.BackupUpdateStatus{ - TimeStarted: &metav1.Time{Time: started}, - TimeCompleted: &metav1.Time{Time: time.Now()}, - BackupSize: &backupSize, - BackupSizeReadable: &backupSizeReadable, + TimeStarted: &metav1.Time{Time: started}, + TimeCompleted: &metav1.Time{Time: time.Now()}, } } default: diff --git a/cmd/backup-manager/app/clean/manager.go b/cmd/backup-manager/app/clean/manager.go index fce0685ab6..f0e88fcdf3 100644 --- a/cmd/backup-manager/app/clean/manager.go +++ b/cmd/backup-manager/app/clean/manager.go @@ -18,7 +18,6 @@ import ( "fmt" "sort" - "github.com/dustin/go-humanize" "github.com/pingcap/tidb-operator/cmd/backup-manager/app/util" "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1" listers "github.com/pingcap/tidb-operator/pkg/client/listers/pingcap/v1alpha1" @@ -89,11 +88,6 @@ func (bm *Manager) performCleanBackup(ctx context.Context, backup *v1alpha1.Back klog.Errorf("delete backup %s for cluster %s backup failure", backup.Name, bm) } - // update the next backup size - if nextNackup != nil { - bm.updateVolumeSnapshotBackupSize(ctx, nextNackup) - } - } else { if backup.Spec.BR != nil { err = bm.CleanBRRemoteBackupData(ctx, backup) @@ -156,26 +150,3 @@ func (bm *Manager) getVolumeSnapshotBackup(backups []*v1alpha1.Backup) *v1alpha1 // reach end of backup list, there is no volume snapshot backups return nil } - -// updateVolumeSnapshotBackupSize update a volume-snapshot backup size -func (bm *Manager) updateVolumeSnapshotBackupSize(ctx context.Context, backup *v1alpha1.Backup) error { - var updateStatus *controller.BackupUpdateStatus - - backupSize, err := util.CalcVolSnapBackupSize(ctx, backup.Spec.StorageProvider) - - if err != nil { - klog.Warningf("Failed to parse BackupSize %d KB, %v", backupSize, err) - } - - backupSizeReadable := humanize.Bytes(uint64(backupSize)) - - updateStatus = &controller.BackupUpdateStatus{ - BackupSize: &backupSize, - BackupSizeReadable: &backupSizeReadable, - } - - return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{ - Type: v1alpha1.BackupComplete, - Status: corev1.ConditionTrue, - }, updateStatus) -} diff --git a/cmd/backup-manager/app/util/backup_size.go b/cmd/backup-manager/app/util/backup_size.go index 6848f8ff81..b8a015aac1 100644 --- a/cmd/backup-manager/app/util/backup_size.go +++ b/cmd/backup-manager/app/util/backup_size.go @@ -17,14 +17,12 @@ import ( "context" "encoding/json" "fmt" - "sort" "strings" "sync/atomic" "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/ebs" - "github.com/aws/aws-sdk-go/service/ec2" "github.com/pingcap/errors" "github.com/pingcap/tidb-operator/cmd/backup-manager/app/constants" "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1" @@ -44,15 +42,8 @@ import ( // interface CalcVolSnapBackupSize called by backup and backup clean. const ( - // This value can be between 5 and 1,000; if MaxResults is given a value larger than 1,000, only 1,000 results are returned. - DescribeSnapMaxReturnResult = 1000 - // This value can be between 100 and 1,0000, and charge ~0.6$/1 million request ListSnapMaxReturnResult = 10000 - - // This value can be between 100 and 1,0000, and charge ~0.6$/1 million request - ListBlocksMaxReturnResult = 10000 - // This value can be between 1 and 50 due to aws service quota EbsApiConcurrency = 40 ) @@ -66,14 +57,11 @@ func CalcVolSnapBackupSize(ctx context.Context, provider v1alpha1.StorageProvide return 0, err } - // get all snapshots per backup volume from aws - snapshots, err := getBackupVolSnapshots(volSnapshots) - if err != nil { return 0, err } - backupSize, err := calcBackupSize(ctx, volSnapshots, snapshots) + backupSize, err := calcBackupSize(ctx, volSnapshots) if err != nil { return 0, err @@ -142,105 +130,22 @@ func getSnapshotsFromBackupmeta(ctx context.Context, provider v1alpha1.StoragePr return volumeIDMap, nil } -// getBackupVolSnapshots get a volue-snapshots map contains map[volumeId]{snapshot1, snapshot2, snapshot3} -func getBackupVolSnapshots(volumes map[string]string) (map[string][]*ec2.Snapshot, error) { - volWithTheirSnapshots := make(map[string][]*ec2.Snapshot) - - // read all snapshots from aws - ec2Session, err := util.NewEC2Session(util.CloudAPIConcurrency) - if err != nil { - klog.Errorf("new a ec2 session failure.") - return nil, err - } - - // init search filter.Values - // init volWithTheirSnapshots - volValues := make([]*string, 0) - for volumeId := range volumes { - volValues = append(volValues, aws.String(volumeId)) - if volWithTheirSnapshots[volumeId] == nil { - volWithTheirSnapshots[volumeId] = make([]*ec2.Snapshot, 0) - } - } - - filters := []*ec2.Filter{{Name: aws.String("volume-id"), Values: volValues}} - // describe snapshot is heavy operator, try to call only once - // api has limit with max 1000 snapshots - // search with filter volume id the backupmeta contains - var nextToken *string - for { - resp, err := ec2Session.EC2.DescribeSnapshots(&ec2.DescribeSnapshotsInput{ - OwnerIds: aws.StringSlice([]string{"self"}), - MaxResults: aws.Int64(DescribeSnapMaxReturnResult), - Filters: filters, - NextToken: nextToken, - }) - - if err != nil { - return nil, err - } - - for i, s := range resp.Snapshots { - if *s.State == ec2.SnapshotStateCompleted { - if volWithTheirSnapshots[*s.VolumeId] == nil { - klog.Errorf("search with filter[volume-id] received unexpected result, volumeId:%s, snapshotId:%s", *s.VolumeId, *s.SnapshotId) - break - } - klog.Infof("the snapshot#%d %s created for volume %s", i, *s.SnapshotId, *s.VolumeId) - volWithTheirSnapshots[*s.VolumeId] = append(volWithTheirSnapshots[*s.VolumeId], s) - } else { // skip ongoing snapshots - klog.Infof("the snapshot#%d %s creating... skip it", i, *s.SnapshotId) - continue - } - } - - // check if there's more to retrieve - if resp.NextToken == nil { - break - } - klog.Infof("the total number of snapshot is %d", len(resp.Snapshots)) - nextToken = resp.NextToken - } - - return volWithTheirSnapshots, nil -} - -// calcBackupSize get a volue-snapshots backup size -func calcBackupSize(ctx context.Context, volumes map[string]string, snapshots map[string][]*ec2.Snapshot) (uint64, error) { +// calcBackupSize get a volume-snapshots backup size +func calcBackupSize(ctx context.Context, volumes map[string]string) (uint64, error) { var backupSize uint64 var apiReqCount uint64 workerPool := util.NewWorkerPool(EbsApiConcurrency, "list snapshot size") eg, _ := errgroup.WithContext(ctx) - for volumeId, id := range volumes { - volSnapshots := snapshots[volumeId] + for _, id := range volumes { snapshotId := id // sort snapshots by timestamp workerPool.ApplyOnErrorGroup(eg, func() error { - // get prev snapshot backup - prevSnapshot, err := getPrevSnapshotId(snapshotId, volSnapshots) + snapSize, apiReq, err := calculateSnapshotSize(snapshotId) if err != nil { return err } - - // full/initial snapshot backup - if prevSnapshot == "" { - snapSize, apiReq, err := initialSnapshotSize(snapshotId) - if err != nil { - return err - } - - atomic.AddUint64(&backupSize, snapSize) - atomic.AddUint64(&apiReqCount, apiReq) - return nil - } - - snapSize, apiReq, err := changedBlocksSize(prevSnapshot, snapshotId) - if err != nil { - return err - } - atomic.AddUint64(&backupSize, snapSize) atomic.AddUint64(&apiReqCount, apiReq) return nil @@ -257,9 +162,8 @@ func calcBackupSize(ctx context.Context, volumes map[string]string, snapshots ma return backupSize, nil } -// initialSnapshotSize calculate size of an initial snapshot in bytes by listing its blocks. -// initial snapshot always a ful backup of volume -func initialSnapshotSize(snapshotId string) (uint64, uint64, error) { +// calculateSnapshotSize calculate size of an snapshot in bytes by listing its blocks. +func calculateSnapshotSize(snapshotId string) (uint64, uint64, error) { var snapshotSize uint64 var numApiReq uint64 ebsSession, err := util.NewEBSSession(util.CloudAPIConcurrency) @@ -293,72 +197,3 @@ func initialSnapshotSize(snapshotId string) (uint64, uint64, error) { klog.Infof("full backup snapshot size %d bytes, num of ListSnapshotBlocks request %d", snapshotSize, numApiReq) return snapshotSize, numApiReq, nil } - -func getPrevSnapshotId(snapshotId string, volSnapshots []*ec2.Snapshot) (string, error) { - var prevSnapshotId string - - sort.Slice(volSnapshots, func(i, j int) bool { - return volSnapshots[i].StartTime.Before(*volSnapshots[j].StartTime) - }) - - for i, snapshot := range volSnapshots { - if snapshotId == *snapshot.SnapshotId { - // the first snapshot for the volume - if i == 0 { - return "", nil - } - prevSnapshotId = *volSnapshots[i-1].SnapshotId - klog.Infof("the prevSnapshot index is %d, ID is %s", i, *snapshot.SnapshotId) - break - } - } - if len(prevSnapshotId) == 0 { - return "", fmt.Errorf("Could not find the prevousely snapshot id, current snapshotId: %s.", snapshotId) - } - return prevSnapshotId, nil -} - -// changedBlocksSize calculates changed blocks total size in bytes between two snapshots with common ancestry. -func changedBlocksSize(preSnapshotId string, snapshotId string) (uint64, uint64, error) { - var numBlocks int - var snapshotSize uint64 - var numApiReq uint64 - - klog.Infof("the calc snapshot size for %s, base on prev snapshot %s", snapshotId, preSnapshotId) - ebsSession, err := util.NewEBSSession(util.CloudAPIConcurrency) - if err != nil { - klog.Errorf("new a ebs session failure.") - return 0, numApiReq, err - } - - var nextToken *string - - for { - resp, err := ebsSession.EBS.ListChangedBlocks(&ebs.ListChangedBlocksInput{ - FirstSnapshotId: aws.String(preSnapshotId), - MaxResults: aws.Int64(ListBlocksMaxReturnResult), - SecondSnapshotId: aws.String(snapshotId), - NextToken: nextToken, - }) - numApiReq += 1 - if err != nil { - return 0, numApiReq, err - } - numBlocks += len(resp.ChangedBlocks) - - // retrieve only changed block and blocks only existed in current snapshot (new add blocks) - for _, block := range resp.ChangedBlocks { - if block.SecondBlockToken != nil && resp.BlockSize != nil { - snapshotSize += uint64(*resp.BlockSize) - } - } - - // check if there is more to retrieve - if resp.NextToken == nil { - break - } - nextToken = resp.NextToken - } - klog.Infof("the total size of snapshot %d, num of api ListChangedBlocks request %d, snapshot id %s", snapshotSize, numApiReq, snapshotId) - return snapshotSize, numApiReq, nil -} diff --git a/pkg/backup/util/remote.go b/pkg/backup/util/remote.go index c0551d7e32..6125375d32 100644 --- a/pkg/backup/util/remote.go +++ b/pkg/backup/util/remote.go @@ -215,7 +215,7 @@ type BatchDeleteObjectsResult struct { Errors []ObjectError } -// BatchDeleteObjects delete mutli objects +// BatchDeleteObjects delete multi objects // // Depending on storage type, it use function 'BatchDeleteObjectsOfS3' or 'BatchDeleteObjectsConcurrently' func (b *StorageBackend) BatchDeleteObjects(ctx context.Context, objs []*blob.ListObject, opt v1alpha1.BatchDeleteOption) *BatchDeleteObjectsResult {