Skip to content

Commit

Permalink
wip remove backupservice
Browse files Browse the repository at this point in the history
Signed-off-by: Andy Goldstein <andy.goldstein@gmail.com>
  • Loading branch information
ncdc committed May 18, 2018
1 parent 693e185 commit fe2fb39
Show file tree
Hide file tree
Showing 28 changed files with 806 additions and 559 deletions.
9 changes: 7 additions & 2 deletions pkg/backup/backup_pv_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,13 @@ type backupPVAction struct {
log logrus.FieldLogger
}

func NewBackupPVAction(log logrus.FieldLogger) ItemAction {
return &backupPVAction{log: log}
func NewBackupPVAction() ItemAction {
return &backupPVAction{}
}

// SetLog is called when this is initialized as part of the server half of a plugin.
func (a *backupPVAction) SetLog(log logrus.FieldLogger) {
a.log = log
}

func (a *backupPVAction) AppliesTo() (ResourceSelector, error) {
Expand Down
8 changes: 6 additions & 2 deletions pkg/backup/pod_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,12 @@ type podAction struct {
}

// NewPodAction creates a new ItemAction for pods.
func NewPodAction(log logrus.FieldLogger) ItemAction {
return &podAction{log: log}
func NewPodAction() ItemAction {
return &podAction{}
}

func (a *podAction) SetLog(log logrus.FieldLogger) {
a.log = log
}

// AppliesTo returns a ResourceSelector that applies only to pods.
Expand Down
7 changes: 5 additions & 2 deletions pkg/backup/service_account_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,21 @@ type serviceAccountAction struct {
}

// NewServiceAccountAction creates a new ItemAction for service accounts.
func NewServiceAccountAction(log logrus.FieldLogger, client rbacclient.ClusterRoleBindingInterface) (ItemAction, error) {
func NewServiceAccountAction(client rbacclient.ClusterRoleBindingInterface) (ItemAction, error) {
clusterRoleBindings, err := client.List(metav1.ListOptions{})
if err != nil {
return nil, errors.WithStack(err)
}

return &serviceAccountAction{
log: log,
clusterRoleBindings: clusterRoleBindings.Items,
}, nil
}

func (a *serviceAccountAction) SetLog(log logrus.FieldLogger) {
a.log = log
}

// AppliesTo returns a ResourceSelector that applies only to service accounts.
func (a *serviceAccountAction) AppliesTo() (ResourceSelector, error) {
return ResourceSelector{
Expand Down
10 changes: 6 additions & 4 deletions pkg/cloudprovider/aws/object_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,12 @@ type objectStore struct {
kmsKeyID string
}

func NewObjectStore(log logrus.FieldLogger) cloudprovider.ObjectStore {
return &objectStore{
log: log,
}
func NewObjectStore() cloudprovider.ObjectStore {
return &objectStore{}
}

func (o *objectStore) SetLog(log logrus.FieldLogger) {
o.log = log
}

func (o *objectStore) Init(config map[string]string) error {
Expand Down
192 changes: 68 additions & 124 deletions pkg/cloudprovider/backup_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package cloudprovider

import (
"context"
"fmt"
"io"
"io/ioutil"
Expand All @@ -26,43 +25,12 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"

"k8s.io/apimachinery/pkg/runtime"
kerrors "k8s.io/apimachinery/pkg/util/errors"

api "github.com/heptio/ark/pkg/apis/ark/v1"
"github.com/heptio/ark/pkg/generated/clientset/versioned/scheme"
)

// BackupService contains methods for working with backups in object storage.
type BackupService interface {
BackupGetter
// UploadBackup uploads the specified Ark backup of a set of Kubernetes API objects, whose manifests are
// stored in the specified file, into object storage in an Ark bucket, tagged with Ark metadata. Returns
// an error if a problem is encountered accessing the file or performing the upload via the cloud API.
UploadBackup(bucket, name string, metadata, backup, log io.Reader) error

// DownloadBackup downloads an Ark backup with the specified object key from object storage via the cloud API.
// It returns the snapshot metadata and data (separately), or an error if a problem is encountered
// downloading or reading the file from the cloud API.
DownloadBackup(bucket, name string) (io.ReadCloser, error)

// DeleteBackupDir deletes all files in object storage for the given backup.
DeleteBackupDir(bucket, backupName string) error

// GetBackup gets the specified api.Backup from the given bucket in object storage.
GetBackup(bucket, name string) (*api.Backup, error)

// CreateSignedURL creates a pre-signed URL that can be used to download a file from object
// storage. The URL expires after ttl.
CreateSignedURL(target api.DownloadTarget, bucket, directory string, ttl time.Duration) (string, error)

// UploadRestoreLog uploads the restore's log file to object storage.
UploadRestoreLog(bucket, backup, restore string, log io.Reader) error

// UploadRestoreResults uploads the restore's results file to object storage.
UploadRestoreResults(bucket, backup, restore string, results io.Reader) error
}

// BackupGetter knows how to list backups in object storage.
type BackupGetter interface {
// GetAllBackups lists all the api.Backups in object storage for the given bucket.
Expand Down Expand Up @@ -97,24 +65,6 @@ func getRestoreResultsKey(directory, restore string) string {
return fmt.Sprintf(restoreResultsFileFormatString, directory, restore)
}

type backupService struct {
objectStore ObjectStore
decoder runtime.Decoder
logger logrus.FieldLogger
}

var _ BackupService = &backupService{}
var _ BackupGetter = &backupService{}

// NewBackupService creates a backup service using the provided object store
func NewBackupService(objectStore ObjectStore, logger logrus.FieldLogger) BackupService {
return &backupService{
objectStore: objectStore,
decoder: scheme.Codecs.UniversalDecoder(api.SchemeGroupVersion),
logger: logger,
}
}

func seekToBeginning(r io.Reader) error {
seeker, ok := r.(io.Seeker)
if !ok {
Expand All @@ -125,7 +75,7 @@ func seekToBeginning(r io.Reader) error {
return err
}

func (br *backupService) seekAndPutObject(bucket, key string, file io.Reader) error {
func seekAndPutObject(objectStore ObjectStore, bucket, key string, file io.Reader) error {
if file == nil {
return nil
}
Expand All @@ -134,53 +84,40 @@ func (br *backupService) seekAndPutObject(bucket, key string, file io.Reader) er
return errors.WithStack(err)
}

return br.objectStore.PutObject(bucket, key, file)
return objectStore.PutObject(bucket, key, file)
}

func (br *backupService) UploadBackup(bucket, backupName string, metadata, backup, log io.Reader) error {
// Uploading the log file is best-effort; if it fails, we log the error but it doesn't impact the
// backup's status.
func UploadBackupLog(objectStore ObjectStore, bucket, backupName string, log io.Reader) error {
logKey := getBackupLogKey(backupName, backupName)
if err := br.seekAndPutObject(bucket, logKey, log); err != nil {
br.logger.WithError(err).WithFields(logrus.Fields{
"bucket": bucket,
"key": logKey,
}).Error("Error uploading log file")
}

if metadata == nil {
// If we don't have metadata, something failed, and there's no point in continuing. An object
// storage bucket that is missing the metadata file can't be restored, nor can its logs be
// viewed.
return nil
}
return seekAndPutObject(objectStore, bucket, logKey, log)
}

// upload metadata file
func UploadBackupMetadata(objectStore ObjectStore, bucket, backupName string, metadata io.Reader) error {
metadataKey := getMetadataKey(backupName)
if err := br.seekAndPutObject(bucket, metadataKey, metadata); err != nil {
// failure to upload metadata file is a hard-stop
return err
}
return seekAndPutObject(objectStore, bucket, metadataKey, metadata)
}

if backup != nil {
// upload tar file
if err := br.seekAndPutObject(bucket, getBackupContentsKey(backupName, backupName), backup); err != nil {
// try to delete the metadata file since the data upload failed
deleteErr := br.objectStore.DeleteObject(bucket, metadataKey)
func UploadBackup(objectStore ObjectStore, bucket, backupName string, backup io.Reader) error {
if err := seekAndPutObject(objectStore, bucket, getBackupContentsKey(backupName, backupName), backup); err != nil {
// try to delete the metadata file since the data upload failed
metadataKey := getMetadataKey(backupName)
deleteErr := objectStore.DeleteObject(bucket, metadataKey)

return kerrors.NewAggregate([]error{err, deleteErr})
}
return kerrors.NewAggregate([]error{err, deleteErr})
}

return nil
}

func (br *backupService) DownloadBackup(bucket, backupName string) (io.ReadCloser, error) {
return br.objectStore.GetObject(bucket, getBackupContentsKey(backupName, backupName))
// DownloadBackup downloads an Ark backup with the specified object key from object storage via the cloud API.
// It returns the snapshot metadata and data (separately), or an error if a problem is encountered
// downloading or reading the file from the cloud API.
func DownloadBackup(objectStore ObjectStore, bucket, backupName string) (io.ReadCloser, error) {
return objectStore.GetObject(bucket, getBackupContentsKey(backupName, backupName))
}

func (br *backupService) GetAllBackups(bucket string) ([]*api.Backup, error) {
prefixes, err := br.objectStore.ListCommonPrefixes(bucket, "/")
func GetAllBackups(logger logrus.FieldLogger, objectStore ObjectStore, bucket string) ([]*api.Backup, error) {
prefixes, err := objectStore.ListCommonPrefixes(bucket, "/")
if err != nil {
return nil, err
}
Expand All @@ -191,9 +128,9 @@ func (br *backupService) GetAllBackups(bucket string) ([]*api.Backup, error) {
output := make([]*api.Backup, 0, len(prefixes))

for _, backupDir := range prefixes {
backup, err := br.GetBackup(bucket, backupDir)
backup, err := GetBackup(objectStore, bucket, backupDir)
if err != nil {
br.logger.WithError(err).WithField("dir", backupDir).Error("Error reading backup directory")
logger.WithError(err).WithField("dir", backupDir).Error("Error reading backup directory")
continue
}

Expand All @@ -203,10 +140,11 @@ func (br *backupService) GetAllBackups(bucket string) ([]*api.Backup, error) {
return output, nil
}

func (br *backupService) GetBackup(bucket, backupName string) (*api.Backup, error) {
// GetBackup gets the specified api.Backup from the given bucket in object storage.
func GetBackup(objectStore ObjectStore, bucket, backupName string) (*api.Backup, error) {
key := getMetadataKey(backupName)

res, err := br.objectStore.GetObject(bucket, key)
res, err := objectStore.GetObject(bucket, key)
if err != nil {
return nil, err
}
Expand All @@ -217,7 +155,8 @@ func (br *backupService) GetBackup(bucket, backupName string) (*api.Backup, erro
return nil, errors.WithStack(err)
}

obj, _, err := br.decoder.Decode(data, nil, nil)
decoder := scheme.Codecs.UniversalDecoder(api.SchemeGroupVersion)
obj, _, err := decoder.Decode(data, nil, nil)
if err != nil {
return nil, errors.WithStack(err)
}
Expand All @@ -230,71 +169,76 @@ func (br *backupService) GetBackup(bucket, backupName string) (*api.Backup, erro
return backup, nil
}

func (br *backupService) DeleteBackupDir(bucket, backupName string) error {
objects, err := br.objectStore.ListObjects(bucket, backupName+"/")
// DeleteBackupDir deletes all files in object storage for the given backup.
func DeleteBackupDir(logger logrus.FieldLogger, objectStore ObjectStore, bucket, backupName string) error {
objects, err := objectStore.ListObjects(bucket, backupName+"/")
if err != nil {
return err
}

var errs []error
for _, key := range objects {
br.logger.WithFields(logrus.Fields{
logger.WithFields(logrus.Fields{
"bucket": bucket,
"key": key,
}).Debug("Trying to delete object")
if err := br.objectStore.DeleteObject(bucket, key); err != nil {
if err := objectStore.DeleteObject(bucket, key); err != nil {
errs = append(errs, err)
}
}

return errors.WithStack(kerrors.NewAggregate(errs))
}

func (br *backupService) CreateSignedURL(target api.DownloadTarget, bucket, directory string, ttl time.Duration) (string, error) {
// CreateSignedURL creates a pre-signed URL that can be used to download a file from object
// storage. The URL expires after ttl.
func CreateSignedURL(objectStore ObjectStore, target api.DownloadTarget, bucket, directory string, ttl time.Duration) (string, error) {
switch target.Kind {
case api.DownloadTargetKindBackupContents:
return br.objectStore.CreateSignedURL(bucket, getBackupContentsKey(directory, target.Name), ttl)
return objectStore.CreateSignedURL(bucket, getBackupContentsKey(directory, target.Name), ttl)
case api.DownloadTargetKindBackupLog:
return br.objectStore.CreateSignedURL(bucket, getBackupLogKey(directory, target.Name), ttl)
return objectStore.CreateSignedURL(bucket, getBackupLogKey(directory, target.Name), ttl)
case api.DownloadTargetKindRestoreLog:
return br.objectStore.CreateSignedURL(bucket, getRestoreLogKey(directory, target.Name), ttl)
return objectStore.CreateSignedURL(bucket, getRestoreLogKey(directory, target.Name), ttl)
case api.DownloadTargetKindRestoreResults:
return br.objectStore.CreateSignedURL(bucket, getRestoreResultsKey(directory, target.Name), ttl)
return objectStore.CreateSignedURL(bucket, getRestoreResultsKey(directory, target.Name), ttl)
default:
return "", errors.Errorf("unsupported download target kind %q", target.Kind)
}
}

func (br *backupService) UploadRestoreLog(bucket, backup, restore string, log io.Reader) error {
// UploadRestoreLog uploads the restore's log file to object storage.
func UploadRestoreLog(objectStore ObjectStore, bucket, backup, restore string, log io.Reader) error {
key := getRestoreLogKey(backup, restore)
return br.objectStore.PutObject(bucket, key, log)
return objectStore.PutObject(bucket, key, log)
}

func (br *backupService) UploadRestoreResults(bucket, backup, restore string, results io.Reader) error {
// UploadRestoreResults uploads the restore's results file to object storage.
func UploadRestoreResults(objectStore ObjectStore, bucket, backup, restore string, results io.Reader) error {
key := getRestoreResultsKey(backup, restore)
return br.objectStore.PutObject(bucket, key, results)
return objectStore.PutObject(bucket, key, results)
}

// cachedBackupService wraps a real backup service with a cache for getting cloud backups.
type cachedBackupService struct {
BackupService
cache BackupGetter
}

// NewBackupServiceWithCachedBackupGetter returns a BackupService that uses a cache for
// GetAllBackups().
func NewBackupServiceWithCachedBackupGetter(
ctx context.Context,
delegate BackupService,
resyncPeriod time.Duration,
logger logrus.FieldLogger,
) BackupService {
return &cachedBackupService{
BackupService: delegate,
cache: NewBackupCache(ctx, delegate, resyncPeriod, logger),
}
}

func (c *cachedBackupService) GetAllBackups(bucketName string) ([]*api.Backup, error) {
return c.cache.GetAllBackups(bucketName)
}
// type cachedBackupService struct {
// BackupService
// cache BackupGetter
// }

// // NewBackupServiceWithCachedBackupGetter returns a BackupService that uses a cache for
// // GetAllBackups().
// func NewBackupServiceWithCachedBackupGetter(
// ctx context.Context,
// delegate BackupService,
// resyncPeriod time.Duration,
// logger logrus.FieldLogger,
// ) BackupService {
// return &cachedBackupService{
// BackupService: delegate,
// cache: NewBackupCache(ctx, delegate, resyncPeriod, logger),
// }
// }

// func (c *cachedBackupService) GetAllBackups(bucketName string) ([]*api.Backup, error) {
// return c.cache.GetAllBackups(bucketName)
// }
8 changes: 6 additions & 2 deletions pkg/cloudprovider/gcp/block_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,12 @@ type blockStore struct {
log logrus.FieldLogger
}

func NewBlockStore(log logrus.FieldLogger) cloudprovider.BlockStore {
return &blockStore{log: log}
func NewBlockStore() cloudprovider.BlockStore {
return &blockStore{}
}

func (b *blockStore) SetLog(log logrus.FieldLogger) {
b.log = log
}

func (b *blockStore) Init(config map[string]string) error {
Expand Down
Loading

0 comments on commit fe2fb39

Please sign in to comment.