Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Errkit migration 4 (pkg/blockstorage) #3170

Merged
merged 2 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 44 additions & 43 deletions pkg/blockstorage/awsebs/awsebs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package awsebs

import (
"context"
"fmt"
"net/http"
"time"

Expand All @@ -26,7 +27,7 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/jpillora/backoff"
"github.com/pkg/errors"
"github.com/kanisterio/errkit"

awsconfig "github.com/kanisterio/kanister/pkg/aws"
"github.com/kanisterio/kanister/pkg/blockstorage"
Expand Down Expand Up @@ -71,19 +72,19 @@ func NewProvider(ctx context.Context, config map[string]string) (blockstorage.Pr
}
ec2Cli, err := newEC2Client(region, awsConfig)
if err != nil {
return nil, errors.Wrapf(err, "Could not get EC2 client")
return nil, errkit.Wrap(err, "Could not get EC2 client")
}
return &EbsStorage{Ec2Cli: ec2Cli, Role: config[awsconfig.ConfigRole], config: awsConfig}, nil
}

// newEC2Client returns ec2 client struct.
func newEC2Client(awsRegion string, config *aws.Config) (*EC2, error) {
if config == nil {
return nil, errors.New("Invalid empty AWS config")
return nil, errkit.New("Invalid empty AWS config")
}
s, err := session.NewSession(config)
if err != nil {
return nil, errors.Wrap(err, "Failed to create session for EBS")
return nil, errkit.Wrap(err, "Failed to create session for EBS")
}
conf := config.WithMaxRetries(maxRetries).WithRegion(awsRegion).WithCredentials(config.Credentials)
return &EC2{EC2: ec2.New(s, conf)}, nil
Expand Down Expand Up @@ -124,17 +125,17 @@ func (s *EbsStorage) CheckVolumeCreate(ctx context.Context) (bool, error) {

ec2Cli, err := newEC2Client(*s.config.Region, s.config)
if err != nil {
return false, errors.Wrap(err, "Could not get EC2 client")
return false, errkit.Wrap(err, "Could not get EC2 client")
}
dai := &ec2.DescribeAvailabilityZonesInput{}
az, err := ec2Cli.DescribeAvailabilityZones(dai)
if err != nil {
return false, errors.New("Fail to get available zone for EC2 client")
return false, errkit.New("Fail to get available zone for EC2 client")
}
if az != nil {
zoneName = az.AvailabilityZones[1].ZoneName
} else {
return false, errors.New("No available zone for EC2 client")
return false, errkit.New("No available zone for EC2 client")
}

cvi := &ec2.CreateVolumeInput{
Expand All @@ -144,7 +145,7 @@ func (s *EbsStorage) CheckVolumeCreate(ctx context.Context) (bool, error) {
}
_, err = s.Ec2Cli.CreateVolume(cvi)
if !isDryRunErr(err) {
return false, errors.Wrap(err, "Could not create volume with EC2 client")
return false, errkit.Wrap(err, "Could not create volume with EC2 client")
}
return true, nil
}
Expand All @@ -159,14 +160,14 @@ func (s *EbsStorage) VolumeGet(ctx context.Context, id string, zone string) (*bl
return nil, err
}
if len(dvo.Volumes) != len(volIDs) {
return nil, errors.New("Object not found")
return nil, errkit.New("Object not found")
}
vols := dvo.Volumes
if len(vols) == 0 {
return nil, errors.New("Volume with volume_id: " + id + " not found")
return nil, errkit.New("Volume with volume_id: " + id + " not found")
}
if len(vols) > 1 {
return nil, errors.Errorf("Found an unexpected number of volumes: volume_id=%s result_count=%d", id, len(vols))
return nil, errkit.New(fmt.Sprintf("Found an unexpected number of volumes: volume_id=%s result_count=%d", id, len(vols)))
}
vol := vols[0]
mv := s.volumeParse(ctx, vol)
Expand Down Expand Up @@ -268,15 +269,15 @@ func (s *EbsStorage) SnapshotsList(ctx context.Context, tags map[string]string)
// i.e., copying unencrypted to encrypted snapshot is allowed but not vice versa.
func (s *EbsStorage) SnapshotCopy(ctx context.Context, from, to blockstorage.Snapshot) (*blockstorage.Snapshot, error) {
if to.Region == "" {
return nil, errors.New("Destination snapshot AvailabilityZone must be specified")
return nil, errkit.New("Destination snapshot AvailabilityZone must be specified")
}
if to.ID != "" {
return nil, errors.Errorf("Snapshot %v destination ID must be empty", to)
return nil, errkit.New(fmt.Sprintf("Snapshot %v destination ID must be empty", to))
}
// Copy operation must be initiated from the destination region.
ec2Cli, err := newEC2Client(to.Region, s.Ec2Cli.Config.Copy())
if err != nil {
return nil, errors.Wrapf(err, "Could not get EC2 client")
return nil, errkit.Wrap(err, "Could not get EC2 client")
}
// Include a presigned URL when the regions are different. Include it
// independent of whether or not the snapshot is encrypted.
Expand All @@ -290,7 +291,7 @@ func (s *EbsStorage) SnapshotCopy(ctx context.Context, from, to blockstorage.Sna
rq, _ := ec2Cli.CopySnapshotRequest(&si)
su, err2 := rq.Presign(120 * time.Minute)
if err2 != nil {
return nil, errors.Wrap(err2, "Could not presign URL for snapshot copy request")
return nil, errkit.Wrap(err2, "Could not presign URL for snapshot copy request")
}
presignedURL = &su
}
Expand All @@ -316,14 +317,14 @@ func (s *EbsStorage) SnapshotCopy(ctx context.Context, from, to blockstorage.Sna
}
cso, err := ec2Cli.CopySnapshotWithContext(ctx, &csi)
if err != nil {
return nil, errors.Wrapf(err, "Failed to copy snapshot %v", csi)
return nil, errkit.Wrap(err, "Failed to copy snapshot", "snapshot", csi)
}
snapID := aws.StringValue(cso.SnapshotId)
if err = setResourceTags(ctx, ec2Cli, snapID, ktags.GetTags(tags)); err != nil {
return nil, err
}
if err = waitOnSnapshotID(ctx, ec2Cli, snapID); err != nil {
return nil, errors.Wrapf(err, "Snapshot %s did not complete", snapID)
return nil, errkit.Wrap(err, "Snapshot did not complete", "snapshot", snapID)
}
snaps, err := getSnapshots(ctx, ec2Cli, []*string{aws.String(snapID)})
if err != nil {
Expand All @@ -341,7 +342,7 @@ func (s *EbsStorage) SnapshotCopy(ctx context.Context, from, to blockstorage.Sna

// SnapshotCopyWithArgs is part of blockstorage.Provider
func (s *EbsStorage) SnapshotCopyWithArgs(ctx context.Context, from blockstorage.Snapshot, to blockstorage.Snapshot, args map[string]string) (*blockstorage.Snapshot, error) {
return nil, errors.New("Copy Snapshot with Args not implemented")
return nil, errkit.New("Copy Snapshot with Args not implemented")
}

// SnapshotCreate is part of blockstorage.Provider
Expand All @@ -358,7 +359,7 @@ func (s *EbsStorage) SnapshotCreate(ctx context.Context, volume blockstorage.Vol
csi.SetDryRun(s.Ec2Cli.DryRun)
snap, err := s.Ec2Cli.CreateSnapshotWithContext(ctx, csi)
if err != nil && !isDryRunErr(err) {
return nil, errors.Wrapf(err, "Failed to create snapshot, volume_id: %s", *csi.VolumeId)
return nil, errkit.Wrap(err, "Failed to create snapshot", "volume_id", *csi.VolumeId)
}

region, err := availabilityZoneToRegion(ctx, s.Ec2Cli, volume.Az)
Expand All @@ -381,7 +382,7 @@ func (s *EbsStorage) SnapshotCreateWaitForCompletion(ctx context.Context, snap *
return nil
}
if err := waitOnSnapshotID(ctx, s.Ec2Cli, snap.ID); err != nil {
return errors.Wrapf(err, "Waiting on snapshot %v", snap)
return errkit.Wrap(err, "Waiting on snapshot", "snapshot", snap)
}
return nil
}
Expand All @@ -399,7 +400,7 @@ func (s *EbsStorage) SnapshotDelete(ctx context.Context, snapshot *blockstorage.
return nil
}
if err != nil && !isDryRunErr(err) {
return errors.Wrap(err, "Failed to delete snapshot")
return errkit.Wrap(err, "Failed to delete snapshot")
}
return nil
}
Expand Down Expand Up @@ -431,7 +432,7 @@ func (s *EbsStorage) VolumeDelete(ctx context.Context, volume *blockstorage.Volu
return nil
}
if err != nil && !isDryRunErr(err) {
return errors.Wrapf(err, "Failed to delete volume volID: %s", volume.ID)
return errkit.Wrap(err, "Failed to delete volume", "volID", volume.ID)
}
return nil
}
Expand All @@ -444,26 +445,26 @@ func (s *EbsStorage) SetTags(ctx context.Context, resource interface{}, tags map
case *blockstorage.Snapshot:
return setResourceTags(ctx, s.Ec2Cli, res.ID, tags)
default:
return errors.Wrapf(nil, "Unknown resource type: %v", res)
return errkit.Wrap(nil, "Unknown resource type", "resourceType", res)
}
}

// setResourceTags sets tags on the specified resource
func setResourceTags(ctx context.Context, ec2Cli *EC2, resourceID string, tags map[string]string) error {
cti := &ec2.CreateTagsInput{Resources: []*string{&resourceID}, Tags: mapToEC2Tags(tags)}
if _, err := ec2Cli.CreateTags(cti); err != nil {
return errors.Wrapf(err, "Failed to set tags, resource_id:%s", resourceID)
return errkit.Wrap(err, "Failed to set tags", "resource_id", resourceID)
}
return nil
}

// VolumeCreateFromSnapshot is part of blockstorage.Provider
func (s *EbsStorage) VolumeCreateFromSnapshot(ctx context.Context, snapshot blockstorage.Snapshot, tags map[string]string) (*blockstorage.Volume, error) {
if snapshot.Volume == nil {
return nil, errors.New("Snapshot volume information not available")
return nil, errkit.New("Snapshot volume information not available")
}
if snapshot.Volume.VolumeType == "" || snapshot.Volume.Az == "" || snapshot.Volume.Tags == nil {
return nil, errors.Errorf("Required volume fields not available, volumeType: %s, Az: %s, VolumeTags: %v", snapshot.Volume.VolumeType, snapshot.Volume.Az, snapshot.Volume.Tags)
return nil, errkit.New(fmt.Sprintf("Required volume fields not available, volumeType: %s, Az: %s, VolumeTags: %v", snapshot.Volume.VolumeType, snapshot.Volume.Az, snapshot.Volume.Tags))
}
kubeCli, err := kube.NewClient()
if err != nil {
Expand All @@ -474,7 +475,7 @@ func (s *EbsStorage) VolumeCreateFromSnapshot(ctx context.Context, snapshot bloc
return nil, err
}
if len(zones) != 1 {
return nil, errors.Errorf("Length of zone slice should be 1, got %d", len(zones))
return nil, errkit.New(fmt.Sprintf("Length of zone slice should be 1, got %d", len(zones)))
}
cvi := &ec2.CreateVolumeInput{
AvailabilityZone: aws.String(zones[0]),
Expand All @@ -495,7 +496,7 @@ func (s *EbsStorage) VolumeCreateFromSnapshot(ctx context.Context, snapshot bloc
volID, err := createVolume(ctx, s.Ec2Cli, cvi, ktags.GetTags(tags))
if err != nil {
if isVolNotFoundErr(err) {
return nil, errors.Wrap(err, "This may indicate insufficient permissions for KMS keys.")
return nil, errkit.Wrap(err, "This may indicate insufficient permissions for KMS keys.")
}
return nil, err
}
Expand Down Expand Up @@ -530,13 +531,13 @@ func getSnapshots(ctx context.Context, ec2Cli *EC2, snapIDs []*string) ([]*ec2.S
dsi := &ec2.DescribeSnapshotsInput{SnapshotIds: snapIDs}
dso, err := ec2Cli.DescribeSnapshotsWithContext(ctx, dsi)
if err != nil {
return nil, errors.Wrapf(err, blockstorage.SnapshotDoesNotExistError+", snapshot_ids: %p", snapIDs)
return nil, errkit.Wrap(err, blockstorage.SnapshotDoesNotExistError+", snapshot_ids: %p", snapIDs)
}
// TODO: handle paging and continuation
if len(dso.Snapshots) != len(snapIDs) {
log.Error().Print("Did not find all requested snapshots", field.M{"snapshots_requested": snapIDs, "snapshots_found": dso.Snapshots})
// TODO: Move mapping to HTTP error to the caller
return nil, errors.New(blockstorage.SnapshotDoesNotExistError)
return nil, errkit.New(blockstorage.SnapshotDoesNotExistError)
}
return dso.Snapshots, nil
}
Expand All @@ -549,11 +550,11 @@ func availabilityZoneToRegion(ctx context.Context, awsCli *EC2, az string) (ar s

azo, err := awsCli.DescribeAvailabilityZonesWithContext(ctx, azi)
if err != nil {
return "", errors.Wrapf(err, "Could not determine region for availability zone (AZ) %s", az)
return "", errkit.Wrap(err, "Could not determine region for availability zone (AZ)", "az", az)
}

if len(azo.AvailabilityZones) == 0 {
return "", errors.New("Region unavailable for availability zone" + az)
return "", errkit.New("Region unavailable for availability zone" + az)
}

return aws.StringValue(azo.AvailabilityZones[0].RegionName), nil
Expand Down Expand Up @@ -585,11 +586,11 @@ func waitOnVolume(ctx context.Context, ec2Cli *EC2, vol *ec2.Volume) error {
return err
}
if len(dvo.Volumes) != 1 {
return errors.New("Object not found")
return errkit.New("Object not found")
}
s := dvo.Volumes[0]
if *s.State == ec2.VolumeStateError {
return errors.New("Creating EBS volume failed")
return errkit.New("Creating EBS volume failed")
}
if *s.State == ec2.VolumeStateAvailable {
log.Print("Volume creation complete", field.M{"VolumeID": *vol.VolumeId})
Expand All @@ -612,14 +613,14 @@ func waitOnSnapshotID(ctx context.Context, ec2Cli *EC2, snapID string) error {
return poll.WaitWithBackoff(ctx, snapWaitBackoff, func(ctx context.Context) (bool, error) {
dso, err := ec2Cli.DescribeSnapshotsWithContext(ctx, dsi)
if err != nil {
return false, errors.Wrapf(err, "Failed to describe snapshot, snapshot_id: %s", snapID)
return false, errkit.Wrap(err, "Failed to describe snapshot", "snapshot_id", snapID)
}
if len(dso.Snapshots) != 1 {
return false, errors.New(blockstorage.SnapshotDoesNotExistError)
return false, errkit.New(blockstorage.SnapshotDoesNotExistError)
}
s := dso.Snapshots[0]
if *s.State == ec2.SnapshotStateError {
return false, errors.New("Snapshot EBS volume failed")
return false, errkit.New("Snapshot EBS volume failed")
}
if *s.State == ec2.SnapshotStateCompleted {
log.Print("Snapshot completed", field.M{"SnapshotID": snapID})
Expand All @@ -644,14 +645,14 @@ func GetRegionFromEC2Metadata() (string, error) {
ec2MetaData := ec2metadata.New(session.Must(session.NewSession()), &conf)

awsRegion, err := ec2MetaData.Region()
return awsRegion, errors.Wrap(err, "Failed to get AWS Region")
return awsRegion, errkit.Wrap(err, "Failed to get AWS Region")
}

// FromRegion is part of zone.Mapper
func (s *EbsStorage) FromRegion(ctx context.Context, region string) ([]string, error) {
ec2Cli, err := newEC2Client(region, s.config)
if err != nil {
return nil, errors.Wrapf(err, "Could not get EC2 client while fetching zones FromRegion (%s)", region)
return nil, errkit.Wrap(err, "Could not get EC2 client while fetching zones FromRegion", "region", region)
}
trueBool := true
filterKey := "region-name"
Expand All @@ -662,7 +663,7 @@ func (s *EbsStorage) FromRegion(ctx context.Context, region string) ([]string, e
},
})
if err != nil {
return nil, errors.Wrapf(err, "Failed to get availability zones for region %s", region)
return nil, errkit.Wrap(err, "Failed to get availability zones for region", "region", region)
}
zoneList := []string{}
for _, zone := range zones.AvailabilityZones {
Expand All @@ -675,7 +676,7 @@ func (s *EbsStorage) GetRegions(ctx context.Context) ([]string, error) {
trueBool := true
result, err := s.Ec2Cli.DescribeRegions(&ec2.DescribeRegionsInput{AllRegions: &trueBool})
if err != nil {
return nil, errors.Wrap(err, "Failed to describe regions")
return nil, errkit.Wrap(err, "Failed to describe regions")
}
regions := []string{}

Expand All @@ -689,10 +690,10 @@ func (s *EbsStorage) GetRegions(ctx context.Context) ([]string, error) {
func (s *EbsStorage) SnapshotRestoreTargets(ctx context.Context, snapshot *blockstorage.Snapshot) (global bool, regionsAndZones map[string][]string, err error) {
// A few checks from VolumeCreateFromSnapshot
if snapshot.Volume == nil {
return false, nil, errors.New("Snapshot volume information not available")
return false, nil, errkit.New("Snapshot volume information not available")
}
if snapshot.Volume.VolumeType == "" || snapshot.Volume.Az == "" || snapshot.Volume.Tags == nil {
return false, nil, errors.Errorf("Required volume fields not available, volumeType: %s, Az: %s, VolumeTags: %v", snapshot.Volume.VolumeType, snapshot.Volume.Az, snapshot.Volume.Tags)
return false, nil, errkit.New(fmt.Sprintf("Required volume fields not available, volumeType: %s, Az: %s, VolumeTags: %v", snapshot.Volume.VolumeType, snapshot.Volume.Az, snapshot.Volume.Tags))
}
// EBS snapshots can only be restored in their region
zl, err := s.FromRegion(ctx, snapshot.Region)
Expand Down
Loading