Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ebs: calc full snapshot size and report backup size for failed backup #5007

Merged
merged 7 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 24 additions & 12 deletions cmd/backup-manager/app/backup/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,27 @@ func (bm *Manager) performBackup(ctx context.Context, backup *v1alpha1.Backup, d
// run br binary to do the real job
backupErr := bm.backupData(ctx, backup, bm.StatusUpdater)

defer func() {
// Calculate the backup size for ebs backup job even if it fails
if bm.Mode == string(v1alpha1.BackupModeVolumeSnapshot) && !bm.Initialize {
backupSize, err := util.CalcVolSnapBackupSize(ctx, backup.Spec.StorageProvider)
if err != nil {
klog.Warningf("Failed to calc volume snapshot backup size %d bytes, %v", backupSize, err)
return
}

backupSizeReadable := humanize.Bytes(uint64(backupSize))

updateStatus := &controller.BackupUpdateStatus{
BackupSize: &backupSize,
BackupSizeReadable: &backupSizeReadable,
}

bm.StatusUpdater.Update(backup, nil,
BornChanger marked this conversation as resolved.
Show resolved Hide resolved
updateStatus)
}
}()

if db != nil && oldTikvGCTimeDuration < tikvGCTimeDuration {
// use another context to revert `tikv_gc_life_time` back.
// `DefaultTerminationGracePeriodSeconds` for a pod is 30, so we use a smaller timeout value here.
Expand All @@ -328,6 +349,7 @@ func (bm *Manager) performBackup(ctx context.Context, backup *v1alpha1.Backup, d
}
errs = append(errs, err)
klog.Errorf("cluster %s reset tikv GC life time to %s failed, err: %s", bm, oldTikvGCTime, err)

uerr := bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupFailed,
Status: corev1.ConditionTrue,
Expand Down Expand Up @@ -370,19 +392,9 @@ func (bm *Manager) performBackup(ctx context.Context, backup *v1alpha1.Backup, d
completeCondition = v1alpha1.VolumeBackupComplete
// In volume snapshot mode, commitTS have been updated according to the
// br command output, so we don't need to update it here.
backupSize, err := util.CalcVolSnapBackupSize(ctx, backup.Spec.StorageProvider)

if err != nil {
klog.Warningf("Failed to calc volume snapshot backup size %d bytes, %v", backupSize, err)
}

backupSizeReadable := humanize.Bytes(uint64(backupSize))

updateStatus = &controller.BackupUpdateStatus{
TimeStarted: &metav1.Time{Time: started},
TimeCompleted: &metav1.Time{Time: time.Now()},
BackupSize: &backupSize,
BackupSizeReadable: &backupSizeReadable,
TimeStarted: &metav1.Time{Time: started},
TimeCompleted: &metav1.Time{Time: time.Now()},
}
}
default:
Expand Down
29 changes: 0 additions & 29 deletions cmd/backup-manager/app/clean/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"fmt"
"sort"

"github.com/dustin/go-humanize"
"github.com/pingcap/tidb-operator/cmd/backup-manager/app/util"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
listers "github.com/pingcap/tidb-operator/pkg/client/listers/pingcap/v1alpha1"
Expand Down Expand Up @@ -89,11 +88,6 @@ func (bm *Manager) performCleanBackup(ctx context.Context, backup *v1alpha1.Back
klog.Errorf("delete backup %s for cluster %s backup failure", backup.Name, bm)
}

// update the next backup size
if nextNackup != nil {
bm.updateVolumeSnapshotBackupSize(ctx, nextNackup)
}

} else {
if backup.Spec.BR != nil {
err = bm.CleanBRRemoteBackupData(ctx, backup)
Expand Down Expand Up @@ -156,26 +150,3 @@ func (bm *Manager) getVolumeSnapshotBackup(backups []*v1alpha1.Backup) *v1alpha1
// reach end of backup list, there is no volume snapshot backups
return nil
}

// updateVolumeSnapshotBackupSize update a volume-snapshot backup size
func (bm *Manager) updateVolumeSnapshotBackupSize(ctx context.Context, backup *v1alpha1.Backup) error {
var updateStatus *controller.BackupUpdateStatus

backupSize, err := util.CalcVolSnapBackupSize(ctx, backup.Spec.StorageProvider)

if err != nil {
klog.Warningf("Failed to parse BackupSize %d KB, %v", backupSize, err)
}

backupSizeReadable := humanize.Bytes(uint64(backupSize))

updateStatus = &controller.BackupUpdateStatus{
BackupSize: &backupSize,
BackupSizeReadable: &backupSizeReadable,
}

return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupComplete,
Status: corev1.ConditionTrue,
}, updateStatus)
}
179 changes: 7 additions & 172 deletions cmd/backup-manager/app/util/backup_size.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@ import (
"context"
"encoding/json"
"fmt"
"sort"
"strings"
"sync/atomic"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ebs"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/pingcap/errors"
"github.com/pingcap/tidb-operator/cmd/backup-manager/app/constants"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
Expand All @@ -44,15 +42,8 @@ import (
// interface CalcVolSnapBackupSize called by backup and backup clean.

const (
// This value can be between 5 and 1,000; if MaxResults is given a value larger than 1,000, only 1,000 results are returned.
DescribeSnapMaxReturnResult = 1000

// This value can be between 100 and 1,0000, and charge ~0.6$/1 million request
ListSnapMaxReturnResult = 10000

// This value can be between 100 and 1,0000, and charge ~0.6$/1 million request
ListBlocksMaxReturnResult = 10000

// This value can be between 1 and 50 due to aws service quota
EbsApiConcurrency = 40
)
Expand All @@ -66,14 +57,11 @@ func CalcVolSnapBackupSize(ctx context.Context, provider v1alpha1.StorageProvide
return 0, err
}

// get all snapshots per backup volume from aws
snapshots, err := getBackupVolSnapshots(volSnapshots)

if err != nil {
return 0, err
}

backupSize, err := calcBackupSize(ctx, volSnapshots, snapshots)
backupSize, err := calcBackupSize(ctx, volSnapshots)

if err != nil {
return 0, err
Expand Down Expand Up @@ -142,105 +130,22 @@ func getSnapshotsFromBackupmeta(ctx context.Context, provider v1alpha1.StoragePr
return volumeIDMap, nil
}

// getBackupVolSnapshots get a volue-snapshots map contains map[volumeId]{snapshot1, snapshot2, snapshot3}
func getBackupVolSnapshots(volumes map[string]string) (map[string][]*ec2.Snapshot, error) {
volWithTheirSnapshots := make(map[string][]*ec2.Snapshot)

// read all snapshots from aws
ec2Session, err := util.NewEC2Session(util.CloudAPIConcurrency)
if err != nil {
klog.Errorf("new a ec2 session failure.")
return nil, err
}

// init search filter.Values
// init volWithTheirSnapshots
volValues := make([]*string, 0)
for volumeId := range volumes {
volValues = append(volValues, aws.String(volumeId))
if volWithTheirSnapshots[volumeId] == nil {
volWithTheirSnapshots[volumeId] = make([]*ec2.Snapshot, 0)
}
}

filters := []*ec2.Filter{{Name: aws.String("volume-id"), Values: volValues}}
// describe snapshot is heavy operator, try to call only once
// api has limit with max 1000 snapshots
// search with filter volume id the backupmeta contains
var nextToken *string
for {
resp, err := ec2Session.EC2.DescribeSnapshots(&ec2.DescribeSnapshotsInput{
OwnerIds: aws.StringSlice([]string{"self"}),
MaxResults: aws.Int64(DescribeSnapMaxReturnResult),
Filters: filters,
NextToken: nextToken,
})

if err != nil {
return nil, err
}

for i, s := range resp.Snapshots {
if *s.State == ec2.SnapshotStateCompleted {
if volWithTheirSnapshots[*s.VolumeId] == nil {
klog.Errorf("search with filter[volume-id] received unexpected result, volumeId:%s, snapshotId:%s", *s.VolumeId, *s.SnapshotId)
break
}
klog.Infof("the snapshot#%d %s created for volume %s", i, *s.SnapshotId, *s.VolumeId)
volWithTheirSnapshots[*s.VolumeId] = append(volWithTheirSnapshots[*s.VolumeId], s)
} else { // skip ongoing snapshots
klog.Infof("the snapshot#%d %s creating... skip it", i, *s.SnapshotId)
continue
}
}

// check if there's more to retrieve
if resp.NextToken == nil {
break
}
klog.Infof("the total number of snapshot is %d", len(resp.Snapshots))
nextToken = resp.NextToken
}

return volWithTheirSnapshots, nil
}

// calcBackupSize get a volue-snapshots backup size
func calcBackupSize(ctx context.Context, volumes map[string]string, snapshots map[string][]*ec2.Snapshot) (uint64, error) {
// calcBackupSize get a volume-snapshots backup size
func calcBackupSize(ctx context.Context, volumes map[string]string) (uint64, error) {
var backupSize uint64
var apiReqCount uint64

workerPool := util.NewWorkerPool(EbsApiConcurrency, "list snapshot size")
eg, _ := errgroup.WithContext(ctx)

for volumeId, id := range volumes {
volSnapshots := snapshots[volumeId]
for _, id := range volumes {
snapshotId := id
// sort snapshots by timestamp
workerPool.ApplyOnErrorGroup(eg, func() error {
// get prev snapshot backup
prevSnapshot, err := getPrevSnapshotId(snapshotId, volSnapshots)
snapSize, apiReq, err := calculateSnapshotSize(snapshotId)
if err != nil {
return err
}

// full/initial snapshot backup
if prevSnapshot == "" {
snapSize, apiReq, err := initialSnapshotSize(snapshotId)
if err != nil {
return err
}

atomic.AddUint64(&backupSize, snapSize)
atomic.AddUint64(&apiReqCount, apiReq)
return nil
}

snapSize, apiReq, err := changedBlocksSize(prevSnapshot, snapshotId)
if err != nil {
return err
}

atomic.AddUint64(&backupSize, snapSize)
atomic.AddUint64(&apiReqCount, apiReq)
return nil
Expand All @@ -257,9 +162,8 @@ func calcBackupSize(ctx context.Context, volumes map[string]string, snapshots ma
return backupSize, nil
}

// initialSnapshotSize calculate size of an initial snapshot in bytes by listing its blocks.
// initial snapshot always a ful backup of volume
func initialSnapshotSize(snapshotId string) (uint64, uint64, error) {
// calculateSnapshotSize calculate size of an snapshot in bytes by listing its blocks.
func calculateSnapshotSize(snapshotId string) (uint64, uint64, error) {
var snapshotSize uint64
var numApiReq uint64
ebsSession, err := util.NewEBSSession(util.CloudAPIConcurrency)
Expand Down Expand Up @@ -293,72 +197,3 @@ func initialSnapshotSize(snapshotId string) (uint64, uint64, error) {
klog.Infof("full backup snapshot size %d bytes, num of ListSnapshotBlocks request %d", snapshotSize, numApiReq)
return snapshotSize, numApiReq, nil
}

func getPrevSnapshotId(snapshotId string, volSnapshots []*ec2.Snapshot) (string, error) {
var prevSnapshotId string

sort.Slice(volSnapshots, func(i, j int) bool {
return volSnapshots[i].StartTime.Before(*volSnapshots[j].StartTime)
})

for i, snapshot := range volSnapshots {
if snapshotId == *snapshot.SnapshotId {
// the first snapshot for the volume
if i == 0 {
return "", nil
}
prevSnapshotId = *volSnapshots[i-1].SnapshotId
klog.Infof("the prevSnapshot index is %d, ID is %s", i, *snapshot.SnapshotId)
break
}
}
if len(prevSnapshotId) == 0 {
return "", fmt.Errorf("Could not find the prevousely snapshot id, current snapshotId: %s.", snapshotId)
}
return prevSnapshotId, nil
}

// changedBlocksSize calculates changed blocks total size in bytes between two snapshots with common ancestry.
func changedBlocksSize(preSnapshotId string, snapshotId string) (uint64, uint64, error) {
var numBlocks int
var snapshotSize uint64
var numApiReq uint64

klog.Infof("the calc snapshot size for %s, base on prev snapshot %s", snapshotId, preSnapshotId)
ebsSession, err := util.NewEBSSession(util.CloudAPIConcurrency)
if err != nil {
klog.Errorf("new a ebs session failure.")
return 0, numApiReq, err
}

var nextToken *string

for {
resp, err := ebsSession.EBS.ListChangedBlocks(&ebs.ListChangedBlocksInput{
FirstSnapshotId: aws.String(preSnapshotId),
MaxResults: aws.Int64(ListBlocksMaxReturnResult),
SecondSnapshotId: aws.String(snapshotId),
NextToken: nextToken,
})
numApiReq += 1
if err != nil {
return 0, numApiReq, err
}
numBlocks += len(resp.ChangedBlocks)

// retrieve only changed block and blocks only existed in current snapshot (new add blocks)
for _, block := range resp.ChangedBlocks {
if block.SecondBlockToken != nil && resp.BlockSize != nil {
snapshotSize += uint64(*resp.BlockSize)
}
}

// check if there is more to retrieve
if resp.NextToken == nil {
break
}
nextToken = resp.NextToken
}
klog.Infof("the total size of snapshot %d, num of api ListChangedBlocks request %d, snapshot id %s", snapshotSize, numApiReq, snapshotId)
return snapshotSize, numApiReq, nil
}
2 changes: 1 addition & 1 deletion pkg/backup/util/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ type BatchDeleteObjectsResult struct {
Errors []ObjectError
}

// BatchDeleteObjects delete mutli objects
// BatchDeleteObjects delete multi objects
//
// Depending on storage type, it use function 'BatchDeleteObjectsOfS3' or 'BatchDeleteObjectsConcurrently'
func (b *StorageBackend) BatchDeleteObjects(ctx context.Context, objs []*blob.ListObject, opt v1alpha1.BatchDeleteOption) *BatchDeleteObjectsResult {
Expand Down