Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: ebs tags refactoring (#44381) #44411

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 11 additions & 98 deletions br/pkg/aws/ebs.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,8 @@ import (
)

const (
AnnPodNameKey string = "tidb.pingcap.com/pod-name"
AnnTemporaryVolumeID string = "temporary/volume-id"
EC2K8SClusterNameKey string = "aws:eks:cluster-name"

pollingPendingSnapshotInterval = 30 * time.Second
errCodeTooManyPendingSnapshots = "PendingSnapshotLimitExceeded"

SourcePvcNameKey string = "source/pvcName"
SourceVolumeIdKey string = "source/VolumeId"
SourceTikvNameKey string = "source/TikvName"
SourceNamespaceKey string = "source/Namespace"
SourceContextKey string = "source/context"
)

type EC2Session struct {
Expand All @@ -47,14 +37,6 @@ type EC2Session struct {

type VolumeAZs map[string]string

type SnapshotTags struct {
sourcePVCName string
sourceTiKVName string
sourceNameSpace string
}

type VolumeSnapshotTags map[string]SnapshotTags

func NewEC2Session(concurrency uint, region string) (*EC2Session, error) {
// aws-sdk has builtin exponential backoff retry mechanism, see:
// https://github.com/aws/aws-sdk-go/blob/db4388e8b9b19d34dcde76c492b17607cd5651e2/aws/client/default_retryer.go#L12-L16
Expand All @@ -71,66 +53,20 @@ func NewEC2Session(concurrency uint, region string) (*EC2Session, error) {
return &EC2Session{ec2: ec2Session, concurrency: concurrency}, nil
}

func GenerateVolumeSnapshotTags(backupInfo *config.EBSBasedBRMeta, pvVolumeMap map[string]string) (VolumeSnapshotTags, error) {
vst := make(VolumeSnapshotTags)
for j := range backupInfo.KubernetesMeta.PVCs {
pvc := backupInfo.KubernetesMeta.PVCs[j]
volID := pvVolumeMap[pvc.Spec.VolumeName]
if volID == "" {
return vst, errors.Errorf("No matching pv is found with name of [%s]", pvc.Spec.VolumeName)
}
vst[volID] = SnapshotTags{
pvc.GetName(),
pvc.GetLabels()[AnnPodNameKey],
pvc.GetNamespace(),
}
}
return vst, nil
}

// CreateSnapshots is the mainly steps to control the data volume snapshots.
func (e *EC2Session) CreateSnapshots(backupInfo *config.EBSBasedBRMeta) (map[string]string, VolumeAZs, error) {
snapIDMap := make(map[string]string)
var volumeIDs []*string

var mutex sync.Mutex
eg, _ := errgroup.WithContext(context.Background())

pvVolumeMap := make(map[string]string)
for j := range backupInfo.KubernetesMeta.PVs {
pv := backupInfo.KubernetesMeta.PVs[j]
pvVolumeMap[pv.GetName()] = pv.GetAnnotations()[AnnTemporaryVolumeID]
}

vst, err := GenerateVolumeSnapshotTags(backupInfo, pvVolumeMap)
if err != nil {
return snapIDMap, nil, errors.Trace(err)
}
taggingAndFillResult := func(createOutput *ec2.CreateSnapshotsOutput, vst VolumeSnapshotTags, k8sClusterName *string) error {
fillResult := func(createOutput *ec2.CreateSnapshotsOutput) {
mutex.Lock()
defer mutex.Unlock()
for j := range createOutput.Snapshots {
snapshot := createOutput.Snapshots[j]
snapIDMap[aws.StringValue(snapshot.VolumeId)] = aws.StringValue(snapshot.SnapshotId)

createTagInput := &ec2.CreateTagsInput{
Resources: []*string{
snapshot.SnapshotId,
},
Tags: []*ec2.Tag{
ec2Tag(SourcePvcNameKey, vst[aws.StringValue(snapshot.VolumeId)].sourcePVCName),
ec2Tag(SourceVolumeIdKey, aws.StringValue(snapshot.VolumeId)),
ec2Tag(SourceTikvNameKey, vst[aws.StringValue(snapshot.VolumeId)].sourceTiKVName),
ec2Tag(SourceNamespaceKey, vst[aws.StringValue(snapshot.VolumeId)].sourceNameSpace),
ec2Tag(SourceContextKey, aws.StringValue(k8sClusterName)),
},
}
_, err := e.ec2.CreateTags(createTagInput)
if err != nil {
return errors.Trace(err)
}
}
return nil
}

workerPool := utils.NewWorkerPool(e.concurrency, "create snapshots")
Expand Down Expand Up @@ -164,17 +100,6 @@ func (e *EC2Session) CreateSnapshots(backupInfo *config.EBSBasedBRMeta) (map[str
return snapIDMap, nil, errors.Trace(err)
}

// retrieve the k8s cluster name from EC2 instance tags
var k8sClusterName *string

for j := range resp1.Reservations[0].Instances[0].Tags {
tag := resp1.Reservations[0].Instances[0].Tags[j]
if aws.StringValue(tag.Key) == EC2K8SClusterNameKey {
k8sClusterName = tag.Value
break
}
}

for j := range resp1.Reservations[0].Instances[0].BlockDeviceMappings {
device := resp1.Reservations[0].Instances[0].BlockDeviceMappings[j]
// skip root volume
Expand Down Expand Up @@ -206,15 +131,14 @@ func (e *EC2Session) CreateSnapshots(backupInfo *config.EBSBasedBRMeta) (map[str

createSnapshotInput.SetInstanceSpecification(&instanceSpecification)

// Copy tags from source volume
createSnapshotInput.SetCopyTagsFromSource("volume")
resp, err := e.createSnapshotsWithRetry(context.TODO(), &createSnapshotInput)

if err != nil {
return errors.Trace(err)
}
err = taggingAndFillResult(resp, vst, k8sClusterName)
if err != nil {
return errors.Trace(err)
}

fillResult(resp)
return nil
})
}
Expand Down Expand Up @@ -381,16 +305,6 @@ func (e *EC2Session) CreateVolumes(meta *config.EBSBasedBRMeta, volumeType strin
newVolumeIDMap[oldVol.ID] = *newVol.VolumeId
}

fetchTagValue := func(tags []*ec2.Tag, key string) string {
for i := range tags {
tag := tags[i]
if aws.StringValue(tag.Key) == key {
return aws.StringValue(tag.Value)
}
}
return ""
}

workerPool := utils.NewWorkerPool(e.concurrency, "create volume")
for i := range meta.TiKVComponent.Stores {
store := meta.TiKVComponent.Stores[i]
Expand All @@ -413,6 +327,7 @@ func (e *EC2Session) CreateVolumes(meta *config.EBSBasedBRMeta, volumeType strin
tags := []*ec2.Tag{
ec2Tag("TiDBCluster-BR", "new"),
ec2Tag("ebs.csi.aws.com/cluster", "true"),
ec2Tag("snapshot/createdFromSnapshotId", oldVol.SnapshotID),
}
snapshotIds := make([]*string, 0)

Expand All @@ -425,13 +340,11 @@ func (e *EC2Session) CreateVolumes(meta *config.EBSBasedBRMeta, volumeType strin
return errors.Errorf("specified snapshot [%s] is not found", oldVol.SnapshotID)
}

snapshotTags := resp.Snapshots[0].Tags
tags = append(tags, ec2Tag("snapshot/createdFromSnapshotId", oldVol.SnapshotID),
ec2Tag("snapshot/"+SourcePvcNameKey, fetchTagValue(snapshotTags, SourcePvcNameKey)),
ec2Tag("snapshot/"+SourceVolumeIdKey, fetchTagValue(snapshotTags, SourceVolumeIdKey)),
ec2Tag("snapshot/"+SourceTikvNameKey, fetchTagValue(snapshotTags, SourceTikvNameKey)),
ec2Tag("snapshot/"+SourceNamespaceKey, fetchTagValue(snapshotTags, SourceNamespaceKey)),
ec2Tag("snapshot/"+SourceContextKey, fetchTagValue(snapshotTags, SourceContextKey)))
// Copy tags from source snapshots
for j := range resp.Snapshots[0].Tags {
tags = append(tags,
ec2Tag("snapshot/"+aws.StringValue(resp.Snapshots[0].Tags[j].Key), aws.StringValue(resp.Snapshots[0].Tags[j].Value)))
}

req.SetTagSpecifications([]*ec2.TagSpecification{
{
Expand Down