diff --git a/pkg/backup/backup_pv_action.go b/pkg/backup/backup_pv_action.go index 684e2bfa65..432b71d6f1 100644 --- a/pkg/backup/backup_pv_action.go +++ b/pkg/backup/backup_pv_action.go @@ -32,8 +32,13 @@ type backupPVAction struct { log logrus.FieldLogger } -func NewBackupPVAction(log logrus.FieldLogger) ItemAction { - return &backupPVAction{log: log} +func NewBackupPVAction() ItemAction { + return &backupPVAction{} +} + +// SetLog is called when this is initialized as part of the server half of a plugin. +func (a *backupPVAction) SetLog(log logrus.FieldLogger) { + a.log = log } func (a *backupPVAction) AppliesTo() (ResourceSelector, error) { diff --git a/pkg/backup/pod_action.go b/pkg/backup/pod_action.go index e940bc4376..a9901aa3f4 100644 --- a/pkg/backup/pod_action.go +++ b/pkg/backup/pod_action.go @@ -34,8 +34,12 @@ type podAction struct { } // NewPodAction creates a new ItemAction for pods. -func NewPodAction(log logrus.FieldLogger) ItemAction { - return &podAction{log: log} +func NewPodAction() ItemAction { + return &podAction{} +} + +func (a *podAction) SetLog(log logrus.FieldLogger) { + a.log = log } // AppliesTo returns a ResourceSelector that applies only to pods. diff --git a/pkg/backup/service_account_action.go b/pkg/backup/service_account_action.go index 7a330723f5..251570235a 100644 --- a/pkg/backup/service_account_action.go +++ b/pkg/backup/service_account_action.go @@ -38,18 +38,21 @@ type serviceAccountAction struct { } // NewServiceAccountAction creates a new ItemAction for service accounts. -func NewServiceAccountAction(log logrus.FieldLogger, client rbacclient.ClusterRoleBindingInterface) (ItemAction, error) { +func NewServiceAccountAction(client rbacclient.ClusterRoleBindingInterface) (ItemAction, error) { clusterRoleBindings, err := client.List(metav1.ListOptions{}) if err != nil { return nil, errors.WithStack(err) } return &serviceAccountAction{ - log: log, clusterRoleBindings: clusterRoleBindings.Items, }, nil } +func (a *serviceAccountAction) SetLog(log logrus.FieldLogger) { + a.log = log +} + // AppliesTo returns a ResourceSelector that applies only to service accounts. func (a *serviceAccountAction) AppliesTo() (ResourceSelector, error) { return ResourceSelector{ diff --git a/pkg/cloudprovider/aws/object_store.go b/pkg/cloudprovider/aws/object_store.go index 242c655354..2ab719c682 100644 --- a/pkg/cloudprovider/aws/object_store.go +++ b/pkg/cloudprovider/aws/object_store.go @@ -44,10 +44,12 @@ type objectStore struct { kmsKeyID string } -func NewObjectStore(log logrus.FieldLogger) cloudprovider.ObjectStore { - return &objectStore{ - log: log, - } +func NewObjectStore() cloudprovider.ObjectStore { + return &objectStore{} +} + +func (o *objectStore) SetLog(log logrus.FieldLogger) { + o.log = log } func (o *objectStore) Init(config map[string]string) error { diff --git a/pkg/cloudprovider/backup_service.go b/pkg/cloudprovider/backup_service.go index 5b4f1e4271..a05d5483b0 100644 --- a/pkg/cloudprovider/backup_service.go +++ b/pkg/cloudprovider/backup_service.go @@ -17,7 +17,6 @@ limitations under the License. package cloudprovider import ( - "context" "fmt" "io" "io/ioutil" @@ -26,43 +25,12 @@ import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" - "k8s.io/apimachinery/pkg/runtime" kerrors "k8s.io/apimachinery/pkg/util/errors" api "github.com/heptio/ark/pkg/apis/ark/v1" "github.com/heptio/ark/pkg/generated/clientset/versioned/scheme" ) -// BackupService contains methods for working with backups in object storage. -type BackupService interface { - BackupGetter - // UploadBackup uploads the specified Ark backup of a set of Kubernetes API objects, whose manifests are - // stored in the specified file, into object storage in an Ark bucket, tagged with Ark metadata. Returns - // an error if a problem is encountered accessing the file or performing the upload via the cloud API. - UploadBackup(bucket, name string, metadata, backup, log io.Reader) error - - // DownloadBackup downloads an Ark backup with the specified object key from object storage via the cloud API. - // It returns the snapshot metadata and data (separately), or an error if a problem is encountered - // downloading or reading the file from the cloud API. - DownloadBackup(bucket, name string) (io.ReadCloser, error) - - // DeleteBackupDir deletes all files in object storage for the given backup. - DeleteBackupDir(bucket, backupName string) error - - // GetBackup gets the specified api.Backup from the given bucket in object storage. - GetBackup(bucket, name string) (*api.Backup, error) - - // CreateSignedURL creates a pre-signed URL that can be used to download a file from object - // storage. The URL expires after ttl. - CreateSignedURL(target api.DownloadTarget, bucket, directory string, ttl time.Duration) (string, error) - - // UploadRestoreLog uploads the restore's log file to object storage. - UploadRestoreLog(bucket, backup, restore string, log io.Reader) error - - // UploadRestoreResults uploads the restore's results file to object storage. - UploadRestoreResults(bucket, backup, restore string, results io.Reader) error -} - // BackupGetter knows how to list backups in object storage. type BackupGetter interface { // GetAllBackups lists all the api.Backups in object storage for the given bucket. @@ -97,24 +65,6 @@ func getRestoreResultsKey(directory, restore string) string { return fmt.Sprintf(restoreResultsFileFormatString, directory, restore) } -type backupService struct { - objectStore ObjectStore - decoder runtime.Decoder - logger logrus.FieldLogger -} - -var _ BackupService = &backupService{} -var _ BackupGetter = &backupService{} - -// NewBackupService creates a backup service using the provided object store -func NewBackupService(objectStore ObjectStore, logger logrus.FieldLogger) BackupService { - return &backupService{ - objectStore: objectStore, - decoder: scheme.Codecs.UniversalDecoder(api.SchemeGroupVersion), - logger: logger, - } -} - func seekToBeginning(r io.Reader) error { seeker, ok := r.(io.Seeker) if !ok { @@ -125,7 +75,7 @@ func seekToBeginning(r io.Reader) error { return err } -func (br *backupService) seekAndPutObject(bucket, key string, file io.Reader) error { +func seekAndPutObject(objectStore ObjectStore, bucket, key string, file io.Reader) error { if file == nil { return nil } @@ -134,53 +84,40 @@ func (br *backupService) seekAndPutObject(bucket, key string, file io.Reader) er return errors.WithStack(err) } - return br.objectStore.PutObject(bucket, key, file) + return objectStore.PutObject(bucket, key, file) } -func (br *backupService) UploadBackup(bucket, backupName string, metadata, backup, log io.Reader) error { - // Uploading the log file is best-effort; if it fails, we log the error but it doesn't impact the - // backup's status. +func UploadBackupLog(objectStore ObjectStore, bucket, backupName string, log io.Reader) error { logKey := getBackupLogKey(backupName, backupName) - if err := br.seekAndPutObject(bucket, logKey, log); err != nil { - br.logger.WithError(err).WithFields(logrus.Fields{ - "bucket": bucket, - "key": logKey, - }).Error("Error uploading log file") - } - - if metadata == nil { - // If we don't have metadata, something failed, and there's no point in continuing. An object - // storage bucket that is missing the metadata file can't be restored, nor can its logs be - // viewed. - return nil - } + return seekAndPutObject(objectStore, bucket, logKey, log) +} - // upload metadata file +func UploadBackupMetadata(objectStore ObjectStore, bucket, backupName string, metadata io.Reader) error { metadataKey := getMetadataKey(backupName) - if err := br.seekAndPutObject(bucket, metadataKey, metadata); err != nil { - // failure to upload metadata file is a hard-stop - return err - } + return seekAndPutObject(objectStore, bucket, metadataKey, metadata) +} - if backup != nil { - // upload tar file - if err := br.seekAndPutObject(bucket, getBackupContentsKey(backupName, backupName), backup); err != nil { - // try to delete the metadata file since the data upload failed - deleteErr := br.objectStore.DeleteObject(bucket, metadataKey) +func UploadBackup(objectStore ObjectStore, bucket, backupName string, backup io.Reader) error { + if err := seekAndPutObject(objectStore, bucket, getBackupContentsKey(backupName, backupName), backup); err != nil { + // try to delete the metadata file since the data upload failed + metadataKey := getMetadataKey(backupName) + deleteErr := objectStore.DeleteObject(bucket, metadataKey) - return kerrors.NewAggregate([]error{err, deleteErr}) - } + return kerrors.NewAggregate([]error{err, deleteErr}) } return nil } -func (br *backupService) DownloadBackup(bucket, backupName string) (io.ReadCloser, error) { - return br.objectStore.GetObject(bucket, getBackupContentsKey(backupName, backupName)) +// DownloadBackup downloads an Ark backup with the specified object key from object storage via the cloud API. +// It returns the snapshot metadata and data (separately), or an error if a problem is encountered +// downloading or reading the file from the cloud API. +func DownloadBackup(objectStore ObjectStore, bucket, backupName string) (io.ReadCloser, error) { + return objectStore.GetObject(bucket, getBackupContentsKey(backupName, backupName)) } -func (br *backupService) GetAllBackups(bucket string) ([]*api.Backup, error) { - prefixes, err := br.objectStore.ListCommonPrefixes(bucket, "/") +func GetAllBackups(logger logrus.FieldLogger, objectStore ObjectStore, bucket string) ([]*api.Backup, error) { + prefixes, err := objectStore.ListCommonPrefixes(bucket, "/") if err != nil { return nil, err } @@ -191,9 +128,9 @@ func (br *backupService) GetAllBackups(bucket string) ([]*api.Backup, error) { output := make([]*api.Backup, 0, len(prefixes)) for _, backupDir := range prefixes { - backup, err := br.GetBackup(bucket, backupDir) + backup, err := GetBackup(objectStore, bucket, backupDir) if err != nil { - br.logger.WithError(err).WithField("dir", backupDir).Error("Error reading backup directory") + logger.WithError(err).WithField("dir", backupDir).Error("Error reading backup directory") continue } @@ -203,10 +140,11 @@ func (br *backupService) GetAllBackups(bucket string) ([]*api.Backup, error) { return output, nil } -func (br *backupService) GetBackup(bucket, backupName string) (*api.Backup, error) { +// GetBackup gets the specified api.Backup from the given bucket in object storage. +func GetBackup(objectStore ObjectStore, bucket, backupName string) (*api.Backup, error) { key := getMetadataKey(backupName) - res, err := br.objectStore.GetObject(bucket, key) + res, err := objectStore.GetObject(bucket, key) if err != nil { return nil, err } @@ -217,7 +155,8 @@ func (br *backupService) GetBackup(bucket, backupName string) (*api.Backup, erro return nil, errors.WithStack(err) } - obj, _, err := br.decoder.Decode(data, nil, nil) + decoder := scheme.Codecs.UniversalDecoder(api.SchemeGroupVersion) + obj, _, err := decoder.Decode(data, nil, nil) if err != nil { return nil, errors.WithStack(err) } @@ -230,19 +169,20 @@ func (br *backupService) GetBackup(bucket, backupName string) (*api.Backup, erro return backup, nil } -func (br *backupService) DeleteBackupDir(bucket, backupName string) error { - objects, err := br.objectStore.ListObjects(bucket, backupName+"/") +// DeleteBackupDir deletes all files in object storage for the given backup. +func DeleteBackupDir(logger logrus.FieldLogger, objectStore ObjectStore, bucket, backupName string) error { + objects, err := objectStore.ListObjects(bucket, backupName+"/") if err != nil { return err } var errs []error for _, key := range objects { - br.logger.WithFields(logrus.Fields{ + logger.WithFields(logrus.Fields{ "bucket": bucket, "key": key, }).Debug("Trying to delete object") - if err := br.objectStore.DeleteObject(bucket, key); err != nil { + if err := objectStore.DeleteObject(bucket, key); err != nil { errs = append(errs, err) } } @@ -250,51 +190,55 @@ func (br *backupService) DeleteBackupDir(bucket, backupName string) error { return errors.WithStack(kerrors.NewAggregate(errs)) } -func (br *backupService) CreateSignedURL(target api.DownloadTarget, bucket, directory string, ttl time.Duration) (string, error) { +// CreateSignedURL creates a pre-signed URL that can be used to download a file from object +// storage. The URL expires after ttl. +func CreateSignedURL(objectStore ObjectStore, target api.DownloadTarget, bucket, directory string, ttl time.Duration) (string, error) { switch target.Kind { case api.DownloadTargetKindBackupContents: - return br.objectStore.CreateSignedURL(bucket, getBackupContentsKey(directory, target.Name), ttl) + return objectStore.CreateSignedURL(bucket, getBackupContentsKey(directory, target.Name), ttl) case api.DownloadTargetKindBackupLog: - return br.objectStore.CreateSignedURL(bucket, getBackupLogKey(directory, target.Name), ttl) + return objectStore.CreateSignedURL(bucket, getBackupLogKey(directory, target.Name), ttl) case api.DownloadTargetKindRestoreLog: - return br.objectStore.CreateSignedURL(bucket, getRestoreLogKey(directory, target.Name), ttl) + return objectStore.CreateSignedURL(bucket, getRestoreLogKey(directory, target.Name), ttl) case api.DownloadTargetKindRestoreResults: - return br.objectStore.CreateSignedURL(bucket, getRestoreResultsKey(directory, target.Name), ttl) + return objectStore.CreateSignedURL(bucket, getRestoreResultsKey(directory, target.Name), ttl) default: return "", errors.Errorf("unsupported download target kind %q", target.Kind) } } -func (br *backupService) UploadRestoreLog(bucket, backup, restore string, log io.Reader) error { +// UploadRestoreLog uploads the restore's log file to object storage. +func UploadRestoreLog(objectStore ObjectStore, bucket, backup, restore string, log io.Reader) error { key := getRestoreLogKey(backup, restore) - return br.objectStore.PutObject(bucket, key, log) + return objectStore.PutObject(bucket, key, log) } -func (br *backupService) UploadRestoreResults(bucket, backup, restore string, results io.Reader) error { +// UploadRestoreResults uploads the restore's results file to object storage. +func UploadRestoreResults(objectStore ObjectStore, bucket, backup, restore string, results io.Reader) error { key := getRestoreResultsKey(backup, restore) - return br.objectStore.PutObject(bucket, key, results) + return objectStore.PutObject(bucket, key, results) } // cachedBackupService wraps a real backup service with a cache for getting cloud backups. -type cachedBackupService struct { - BackupService - cache BackupGetter -} - -// NewBackupServiceWithCachedBackupGetter returns a BackupService that uses a cache for -// GetAllBackups(). -func NewBackupServiceWithCachedBackupGetter( - ctx context.Context, - delegate BackupService, - resyncPeriod time.Duration, - logger logrus.FieldLogger, -) BackupService { - return &cachedBackupService{ - BackupService: delegate, - cache: NewBackupCache(ctx, delegate, resyncPeriod, logger), - } -} - -func (c *cachedBackupService) GetAllBackups(bucketName string) ([]*api.Backup, error) { - return c.cache.GetAllBackups(bucketName) -} +// type cachedBackupService struct { +// BackupService +// cache BackupGetter +// } + +// // NewBackupServiceWithCachedBackupGetter returns a BackupService that uses a cache for +// // GetAllBackups(). +// func NewBackupServiceWithCachedBackupGetter( +// ctx context.Context, +// delegate BackupService, +// resyncPeriod time.Duration, +// logger logrus.FieldLogger, +// ) BackupService { +// return &cachedBackupService{ +// BackupService: delegate, +// cache: NewBackupCache(ctx, delegate, resyncPeriod, logger), +// } +// } + +// func (c *cachedBackupService) GetAllBackups(bucketName string) ([]*api.Backup, error) { +// return c.cache.GetAllBackups(bucketName) +// } diff --git a/pkg/cloudprovider/gcp/block_store.go b/pkg/cloudprovider/gcp/block_store.go index 47e3900f60..ac874990c2 100644 --- a/pkg/cloudprovider/gcp/block_store.go +++ b/pkg/cloudprovider/gcp/block_store.go @@ -42,8 +42,12 @@ type blockStore struct { log logrus.FieldLogger } -func NewBlockStore(log logrus.FieldLogger) cloudprovider.BlockStore { - return &blockStore{log: log} +func NewBlockStore() cloudprovider.BlockStore { + return &blockStore{} +} + +func (b *blockStore) SetLog(log logrus.FieldLogger) { + b.log = log } func (b *blockStore) Init(config map[string]string) error { diff --git a/pkg/cloudprovider/gcp/object_store.go b/pkg/cloudprovider/gcp/object_store.go index 6a01005643..238808c7f3 100644 --- a/pkg/cloudprovider/gcp/object_store.go +++ b/pkg/cloudprovider/gcp/object_store.go @@ -43,10 +43,12 @@ type objectStore struct { privateKey []byte } -func NewObjectStore(log logrus.FieldLogger) cloudprovider.ObjectStore { - return &objectStore{ - log: log, - } +func NewObjectStore() cloudprovider.ObjectStore { + return &objectStore{} +} + +func (o *objectStore) SetLog(log logrus.FieldLogger) { + o.log = log } func (o *objectStore) Init(config map[string]string) error { diff --git a/pkg/cmd/server/plugin/plugin.go b/pkg/cmd/server/plugin/plugin.go index 74dbb0ec72..71b13c41c0 100644 --- a/pkg/cmd/server/plugin/plugin.go +++ b/pkg/cmd/server/plugin/plugin.go @@ -21,7 +21,6 @@ import ( "github.com/heptio/ark/pkg/backup" "github.com/heptio/ark/pkg/client" - "github.com/heptio/ark/pkg/cloudprovider" "github.com/heptio/ark/pkg/cloudprovider/aws" "github.com/heptio/ark/pkg/cloudprovider/azure" "github.com/heptio/ark/pkg/cloudprovider/gcp" @@ -39,58 +38,83 @@ func NewCommand(f client.Factory) *cobra.Command { Run: func(c *cobra.Command, args []string) { logger.Debug("Executing run-plugin command") - arkplugin.NewSimpleServer(). - RegisterObjectStores(map[string]func() cloudprovider.ObjectStore{ - "aws": func() cloudprovider.ObjectStore { - return aws.NewObjectStore(logger) - }, - "azure": azure.NewObjectStore, - "gcp": func() cloudprovider.ObjectStore { - return gcp.NewObjectStore(logger) - }, - }). - RegisterBlockStores(map[string]func() cloudprovider.BlockStore{ - "aws": aws.NewBlockStore, - "azure": azure.NewBlockStore, - "gcp": func() cloudprovider.BlockStore { - return gcp.NewBlockStore(logger) - }, - }). - RegisterBackupItemActions(map[string]func() backup.ItemAction{ - "pv": func() backup.ItemAction { - return backup.NewBackupPVAction(logger) - }, - "pod": func() backup.ItemAction { - return backup.NewPodAction(logger) - }, - "serviceaccount": func() backup.ItemAction { - clientset, err := f.KubeClient() - if err != nil { - panic(err) - } - - action, err := backup.NewServiceAccountAction(logger, clientset.RbacV1().ClusterRoleBindings()) - if err != nil { - panic(err) - } - - return action - }, - }). - RegisterRestoreItemActions(map[string]func() restore.ItemAction{ - "job": func() restore.ItemAction { - return restore.NewJobAction(logger) - }, - "pod": func() restore.ItemAction { - return restore.NewPodAction(logger) - }, - "service": func() restore.ItemAction { - return restore.NewServiceAction(logger) - }, - }). + arkplugin.NewSimpleServer(logger). + RegisterObjectStore("aws", newAwsObjectStore). + RegisterObjectStore("azure", newAzureObjectStore). + RegisterObjectStore("gcp", newGcpObjectStore). + RegisterBlockStore("aws", newAwsBlockStore). + RegisterBlockStore("azure", newAzureBlockStore). + RegisterBlockStore("gcp", newGcpBlockStore). + RegisterBackupItemAction("pv", newPVBackupItemAction). + RegisterBackupItemAction("pod", newPodBackupItemAction). + RegisterBackupItemAction("serviceaccount", newServiceAccountBackupItemAction(f)). + RegisterRestoreItemAction("job", newJobRestoreItemAction). + RegisterRestoreItemAction("pod", newPodRestoreItemAction). + RegisterRestoreItemAction("service", newServiceRestoreItemAction). Serve() }, } return c } + +func newAwsObjectStore() (interface{}, error) { + return aws.NewObjectStore(), nil +} + +func newAzureObjectStore() (interface{}, error) { + return azure.NewObjectStore(), nil +} + +func newGcpObjectStore() (interface{}, error) { + return gcp.NewObjectStore(), nil +} + +func newAwsBlockStore() (interface{}, error) { + return aws.NewBlockStore(), nil +} + +func newAzureBlockStore() (interface{}, error) { + return azure.NewBlockStore(), nil +} + +func newGcpBlockStore() (interface{}, error) { + return gcp.NewBlockStore(), nil +} + +func newPVBackupItemAction() (interface{}, error) { + return backup.NewBackupPVAction(), nil +} + +func newPodBackupItemAction() (interface{}, error) { + return backup.NewPodAction(), nil +} + +func newServiceAccountBackupItemAction(f client.Factory) arkplugin.ServerImplFactory { + return func() (interface{}, error) { + // TODO(ncdc): consider a k8s style WantsKubernetesClientSet initialization approach + clientset, err := f.KubeClient() + if err != nil { + return nil, err + } + + action, err := backup.NewServiceAccountAction(clientset.RbacV1().ClusterRoleBindings()) + if err != nil { + return nil, err + } + + return action, nil + } +} + +func newJobRestoreItemAction() (interface{}, error) { + return restore.NewJobAction(), nil +} + +func newPodRestoreItemAction() (interface{}, error) { + return restore.NewPodAction(), nil +} + +func newServiceRestoreItemAction() (interface{}, error) { + return restore.NewServiceAction(), nil +} diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index 614eebcb54..9dcf8615de 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -172,7 +172,6 @@ type server struct { kubeClientConfig *rest.Config kubeClient kubernetes.Interface arkClient clientset.Interface - backupService cloudprovider.BackupService snapshotService cloudprovider.SnapshotService discoveryClient discovery.DiscoveryInterface clientPool dynamic.ClientPool @@ -249,9 +248,9 @@ func (s *server) run() error { s.watchConfig(originalConfig) - if err := s.initBackupService(config); err != nil { - return err - } + // if err := s.initBackupService(config); err != nil { + // return err + // } if err := s.initSnapshotService(config); err != nil { return err @@ -387,16 +386,16 @@ func (s *server) handleShutdownSignals() { }() } -func (s *server) initBackupService(config *api.Config) error { - s.logger.Info("Configuring cloud provider for backup service") - objectStore, err := getObjectStore(config.BackupStorageProvider.CloudProviderConfig, s.pluginManager) - if err != nil { - return err - } +// func (s *server) initBackupService(config *api.Config) error { +// s.logger.Info("Configuring cloud provider for backup service") +// objectStore, err := getObjectStore(config.BackupStorageProvider.CloudProviderConfig, s.pluginManager) +// if err != nil { +// return err +// } - s.backupService = cloudprovider.NewBackupService(objectStore, s.logger) - return nil -} +// s.backupService = cloudprovider.NewBackupService(objectStore, s.logger) +// return nil +// } func (s *server) initSnapshotService(config *api.Config) error { if config.PersistentVolumeProvider == nil { diff --git a/pkg/controller/backup_controller.go b/pkg/controller/backup_controller.go index cd48d48e63..5dda6b4f64 100644 --- a/pkg/controller/backup_controller.go +++ b/pkg/controller/backup_controller.go @@ -54,26 +54,26 @@ import ( const backupVersion = 1 type backupController struct { - backupper backup.Backupper - backupService cloudprovider.BackupService - bucket string - pvProviderExists bool - lister listers.BackupLister - listerSynced cache.InformerSynced - client arkv1client.BackupsGetter - syncHandler func(backupName string) error - queue workqueue.RateLimitingInterface - clock clock.Clock - logger logrus.FieldLogger - pluginRegistry plugin.Registry - backupTracker BackupTracker + backupper backup.Backupper + objectStoreConfig api.CloudProviderConfig + bucket string + pvProviderExists bool + lister listers.BackupLister + listerSynced cache.InformerSynced + client arkv1client.BackupsGetter + syncHandler func(backupName string) error + queue workqueue.RateLimitingInterface + clock clock.Clock + logger logrus.FieldLogger + pluginRegistry plugin.Registry + backupTracker BackupTracker } func NewBackupController( backupInformer informers.BackupInformer, client arkv1client.BackupsGetter, backupper backup.Backupper, - backupService cloudprovider.BackupService, + objectStoreConfig api.CloudProviderConfig, bucket string, pvProviderExists bool, logger logrus.FieldLogger, @@ -81,18 +81,18 @@ func NewBackupController( backupTracker BackupTracker, ) Interface { c := &backupController{ - backupper: backupper, - backupService: backupService, - bucket: bucket, - pvProviderExists: pvProviderExists, - lister: backupInformer.Lister(), - listerSynced: backupInformer.Informer().HasSynced, - client: client, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "backup"), - clock: &clock.RealClock{}, - logger: logger, - pluginRegistry: pluginRegistry, - backupTracker: backupTracker, + backupper: backupper, + objectStoreConfig: objectStoreConfig, + bucket: bucket, + pvProviderExists: pvProviderExists, + lister: backupInformer.Lister(), + listerSynced: backupInformer.Informer().HasSynced, + client: client, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "backup"), + clock: &clock.RealClock{}, + logger: logger, + pluginRegistry: pluginRegistry, + backupTracker: backupTracker, } c.syncHandler = c.processBackup @@ -348,6 +348,11 @@ func (controller *backupController) runBackup(backup *api.Backup, bucket string) return err } + objectStore, err := getObjectStore(controller.objectStoreConfig, pluginManager) + if err != nil { + return err + } + var errs []error var backupJsonToUpload, backupFileToUpload io.Reader @@ -370,7 +375,7 @@ func (controller *backupController) runBackup(backup *api.Backup, bucket string) backupFileToUpload = backupFile } - if err := controller.backupService.UploadBackup(bucket, backup.Name, backupJsonToUpload, backupFileToUpload, logFile); err != nil { + if err := controller.UploadBackup(log, objectStore, bucket, backup.Name, backupJsonToUpload, backupFileToUpload, logFile); err != nil { errs = append(errs, err) } @@ -379,6 +384,48 @@ func (controller *backupController) runBackup(backup *api.Backup, bucket string) return kerrors.NewAggregate(errs) } +func (controller *backupController) UploadBackup(logger logrus.FieldLogger, objectStore cloudprovider.ObjectStore, bucket, backupName string, metadata, backup, log io.Reader) error { + if err := cloudprovider.UploadBackupLog(objectStore, bucket, backupName, log); err != nil { + // Uploading the log file is best-effort; if it fails, we log the error but it doesn't impact the + // backup's status. + logger.WithError(err).WithField("bucket", bucket).Error("Error uploading log file") + } + + if metadata == nil { + // If we don't have metadata, something failed, and there's no point in continuing. An object + // storage bucket that is missing the metadata file can't be restored, nor can its logs be + // viewed. + return nil + } + + // upload metadata file + if err := cloudprovider.UploadBackupMetadata(objectStore, bucket, backupName, metadata); err != nil { + // failure to upload metadata file is a hard-stop + return err + } + + // upload tar file + return cloudprovider.UploadBackup(objectStore, bucket, backupName, backup) +} + +// TODO(ncdc): move this to a better location that isn't backup specific +func getObjectStore(cloudConfig api.CloudProviderConfig, manager plugin.Manager) (cloudprovider.ObjectStore, error) { + if cloudConfig.Name == "" { + return nil, errors.New("object storage provider name must not be empty") + } + + objectStore, err := manager.GetObjectStore(cloudConfig.Name) + if err != nil { + return nil, err + } + + if err := objectStore.Init(cloudConfig.Config); err != nil { + return nil, err + } + + return objectStore, nil +} + func closeAndRemoveFile(file *os.File, log logrus.FieldLogger) { if err := file.Close(); err != nil { log.WithError(err).WithField("file", file.Name()).Error("error closing file") diff --git a/pkg/controller/backup_deletion_controller.go b/pkg/controller/backup_deletion_controller.go index 06ba01bc9a..538a877096 100644 --- a/pkg/controller/backup_deletion_controller.go +++ b/pkg/controller/backup_deletion_controller.go @@ -45,7 +45,7 @@ type backupDeletionController struct { deleteBackupRequestLister listers.DeleteBackupRequestLister backupClient arkv1client.BackupsGetter snapshotService cloudprovider.SnapshotService - backupService cloudprovider.BackupService + objectStore cloudprovider.ObjectStore bucket string restoreLister listers.RestoreLister restoreClient arkv1client.RestoresGetter @@ -62,7 +62,7 @@ func NewBackupDeletionController( deleteBackupRequestClient arkv1client.DeleteBackupRequestsGetter, backupClient arkv1client.BackupsGetter, snapshotService cloudprovider.SnapshotService, - backupService cloudprovider.BackupService, + objectStore cloudprovider.ObjectStore, bucket string, restoreInformer informers.RestoreInformer, restoreClient arkv1client.RestoresGetter, @@ -74,7 +74,7 @@ func NewBackupDeletionController( deleteBackupRequestLister: deleteBackupRequestInformer.Lister(), backupClient: backupClient, snapshotService: snapshotService, - backupService: backupService, + objectStore: objectStore, bucket: bucket, restoreLister: restoreInformer.Lister(), restoreClient: restoreClient, @@ -226,7 +226,7 @@ func (c *backupDeletionController) processRequest(req *v1.DeleteBackupRequest) e // Try to delete backup from object storage log.Info("Removing backup from object storage") - if err := c.backupService.DeleteBackupDir(c.bucket, backup.Name); err != nil { + if err := cloudprovider.DeleteBackupDir(log, c.objectStore, c.bucket, backup.Name); err != nil { errs = append(errs, errors.Wrap(err, "error deleting backup from object storage").Error()) } diff --git a/pkg/controller/backup_sync_controller.go b/pkg/controller/backup_sync_controller.go index 638d93e21d..628fbb852d 100644 --- a/pkg/controller/backup_sync_controller.go +++ b/pkg/controller/backup_sync_controller.go @@ -33,17 +33,17 @@ import ( ) type backupSyncController struct { - client arkv1client.BackupsGetter - backupService cloudprovider.BackupService - bucket string - syncPeriod time.Duration - namespace string - logger logrus.FieldLogger + client arkv1client.BackupsGetter + backupGetter cloudprovider.BackupGetter + bucket string + syncPeriod time.Duration + namespace string + logger logrus.FieldLogger } func NewBackupSyncController( client arkv1client.BackupsGetter, - backupService cloudprovider.BackupService, + backupGetter cloudprovider.BackupGetter, bucket string, syncPeriod time.Duration, namespace string, @@ -54,12 +54,12 @@ func NewBackupSyncController( syncPeriod = time.Minute } return &backupSyncController{ - client: client, - backupService: backupService, - bucket: bucket, - syncPeriod: syncPeriod, - namespace: namespace, - logger: logger, + client: client, + backupGetter: backupGetter, + bucket: bucket, + syncPeriod: syncPeriod, + namespace: namespace, + logger: logger, } } @@ -76,7 +76,7 @@ const gcFinalizer = "gc.ark.heptio.com" func (c *backupSyncController) run() { c.logger.Info("Syncing backups from object storage") - backups, err := c.backupService.GetAllBackups(c.bucket) + backups, err := c.backupGetter.GetAllBackups(c.bucket) if err != nil { c.logger.WithError(err).Error("error listing backups") return diff --git a/pkg/controller/download_request_controller.go b/pkg/controller/download_request_controller.go index 94e95cd2a2..7d9a844911 100644 --- a/pkg/controller/download_request_controller.go +++ b/pkg/controller/download_request_controller.go @@ -49,7 +49,7 @@ type downloadRequestController struct { downloadRequestListerSynced cache.InformerSynced restoreLister listers.RestoreLister restoreListerSynced cache.InformerSynced - backupService cloudprovider.BackupService + objectStore cloudprovider.ObjectStore bucket string syncHandler func(key string) error queue workqueue.RateLimitingInterface @@ -62,7 +62,7 @@ func NewDownloadRequestController( downloadRequestClient arkv1client.DownloadRequestsGetter, downloadRequestInformer informers.DownloadRequestInformer, restoreInformer informers.RestoreInformer, - backupService cloudprovider.BackupService, + objectStore cloudprovider.ObjectStore, bucket string, logger logrus.FieldLogger, ) Interface { @@ -72,7 +72,7 @@ func NewDownloadRequestController( downloadRequestListerSynced: downloadRequestInformer.Informer().HasSynced, restoreLister: restoreInformer.Lister(), restoreListerSynced: restoreInformer.Informer().HasSynced, - backupService: backupService, + objectStore: objectStore, bucket: bucket, queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "downloadrequest"), clock: &clock.RealClock{}, @@ -236,7 +236,7 @@ func (c *downloadRequestController) generatePreSignedURL(downloadRequest *v1.Dow directory = downloadRequest.Spec.Target.Name } - update.Status.DownloadURL, err = c.backupService.CreateSignedURL(downloadRequest.Spec.Target, c.bucket, directory, signedURLTTL) + update.Status.DownloadURL, err = cloudprovider.CreateSignedURL(c.objectStore, downloadRequest.Spec.Target, c.bucket, directory, signedURLTTL) if err != nil { return err } diff --git a/pkg/controller/restore_controller.go b/pkg/controller/restore_controller.go index cc832b2799..17c8acf224 100644 --- a/pkg/controller/restore_controller.go +++ b/pkg/controller/restore_controller.go @@ -58,7 +58,7 @@ type restoreController struct { restoreClient arkv1client.RestoresGetter backupClient arkv1client.BackupsGetter restorer restore.Restorer - backupService cloudprovider.BackupService + objectStoreConfig api.CloudProviderConfig bucket string pvProviderExists bool backupLister listers.BackupLister @@ -77,7 +77,7 @@ func NewRestoreController( restoreClient arkv1client.RestoresGetter, backupClient arkv1client.BackupsGetter, restorer restore.Restorer, - backupService cloudprovider.BackupService, + objectStoreConfig api.CloudProviderConfig, bucket string, backupInformer informers.BackupInformer, pvProviderExists bool, @@ -89,7 +89,7 @@ func NewRestoreController( restoreClient: restoreClient, backupClient: backupClient, restorer: restorer, - backupService: backupService, + objectStoreConfig: objectStoreConfig, bucket: bucket, pvProviderExists: pvProviderExists, backupLister: backupInformer.Lister(), @@ -211,7 +211,9 @@ func (controller *restoreController) processRestore(key string) error { logContext.Debug("Running processRestore") ns, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { - return errors.Wrap(err, "error splitting queue key") + logContext.WithError(err).Error("unable to process restore: error splitting queue key") + // Return nil here so we don't try to process the key any more + return nil } logContext.Debug("Getting Restore") @@ -247,8 +249,21 @@ func (controller *restoreController) processRestore(key string) error { } } + pluginManager := plugin.NewManager(logContext, logContext.Level, controller.pluginRegistry) + defer pluginManager.CleanupClients() + + objectStore, err := getObjectStore(controller.objectStoreConfig, pluginManager) + if err != nil { + return errors.Wrap(err, "error initializing object store") + } + + actions, err := pluginManager.GetRestoreItemActions() + if err != nil { + return errors.Wrap(err, "error initializing restore item actions") + } + // validation - if restore.Status.ValidationErrors = controller.getValidationErrors(restore); len(restore.Status.ValidationErrors) > 0 { + if restore.Status.ValidationErrors = controller.getValidationErrors(objectStore, restore); len(restore.Status.ValidationErrors) > 0 { restore.Status.Phase = api.RestorePhaseFailedValidation } else { restore.Status.Phase = api.RestorePhaseInProgress @@ -269,7 +284,7 @@ func (controller *restoreController) processRestore(key string) error { logContext.Debug("Running restore") // execution & upload of restore - restoreWarnings, restoreErrors := controller.runRestore(restore, controller.bucket) + restoreWarnings, restoreErrors := controller.runRestore(restore, actions, objectStore, controller.bucket) restore.Status.Warnings = len(restoreWarnings.Ark) + len(restoreWarnings.Cluster) for _, w := range restoreWarnings.Namespaces { @@ -292,12 +307,12 @@ func (controller *restoreController) processRestore(key string) error { return nil } -func (controller *restoreController) getValidationErrors(itm *api.Restore) []string { +func (controller *restoreController) getValidationErrors(objectStore cloudprovider.ObjectStore, itm *api.Restore) []string { var validationErrors []string if itm.Spec.BackupName == "" { validationErrors = append(validationErrors, "BackupName must be non-empty and correspond to the name of a backup in object storage.") - } else if _, err := controller.fetchBackup(controller.bucket, itm.Spec.BackupName); err != nil { + } else if _, err := controller.fetchBackup(objectStore, controller.bucket, itm.Spec.BackupName); err != nil { validationErrors = append(validationErrors, fmt.Sprintf("Error retrieving backup: %v", err)) } @@ -323,7 +338,7 @@ func (controller *restoreController) getValidationErrors(itm *api.Restore) []str return validationErrors } -func (controller *restoreController) fetchBackup(bucket, name string) (*api.Backup, error) { +func (controller *restoreController) fetchBackup(objectStore cloudprovider.ObjectStore, bucket, name string) (*api.Backup, error) { backup, err := controller.backupLister.Backups(controller.namespace).Get(name) if err == nil { return backup, nil @@ -336,7 +351,7 @@ func (controller *restoreController) fetchBackup(bucket, name string) (*api.Back logContext := controller.logger.WithField("backupName", name) logContext.Debug("Backup not found in backupLister, checking object storage directly") - backup, err = controller.backupService.GetBackup(bucket, name) + backup, err = cloudprovider.GetBackup(objectStore, bucket, name) if err != nil { return nil, err } @@ -356,14 +371,14 @@ func (controller *restoreController) fetchBackup(bucket, name string) (*api.Back return backup, nil } -func (controller *restoreController) runRestore(restore *api.Restore, bucket string) (restoreWarnings, restoreErrors api.RestoreResult) { +func (controller *restoreController) runRestore(restore *api.Restore, actions []restore.ItemAction, objectStore cloudprovider.ObjectStore, bucket string) (restoreWarnings, restoreErrors api.RestoreResult) { logContext := controller.logger.WithFields( logrus.Fields{ "restore": kubeutil.NamespaceAndName(restore), "backup": restore.Spec.BackupName, }) - backup, err := controller.fetchBackup(bucket, restore.Spec.BackupName) + backup, err := controller.fetchBackup(objectStore, bucket, restore.Spec.BackupName) if err != nil { logContext.WithError(err).Error("Error getting backup") restoreErrors.Ark = append(restoreErrors.Ark, err.Error()) @@ -372,7 +387,7 @@ func (controller *restoreController) runRestore(restore *api.Restore, bucket str var tempFiles []*os.File - backupFile, err := downloadToTempFile(restore.Spec.BackupName, controller.backupService, bucket, controller.logger) + backupFile, err := downloadToTempFile(restore.Spec.BackupName, objectStore, bucket, controller.logger) if err != nil { logContext.WithError(err).Error("Error downloading backup") restoreErrors.Ark = append(restoreErrors.Ark, err.Error()) @@ -408,15 +423,6 @@ func (controller *restoreController) runRestore(restore *api.Restore, bucket str } }() - pluginManager := plugin.NewManager(logContext, logContext.Level, controller.pluginRegistry) - defer pluginManager.CleanupClients() - - actions, err := pluginManager.GetRestoreItemActions() - if err != nil { - restoreErrors.Ark = append(restoreErrors.Ark, err.Error()) - return - } - logContext.Info("starting restore") restoreWarnings, restoreErrors = controller.restorer.Restore(restore, backup, backupFile, logFile, actions) logContext.Info("restore completed") @@ -429,7 +435,7 @@ func (controller *restoreController) runRestore(restore *api.Restore, bucket str return } - if err := controller.backupService.UploadRestoreLog(bucket, restore.Spec.BackupName, restore.Name, logFile); err != nil { + if err := cloudprovider.UploadRestoreLog(objectStore, bucket, restore.Spec.BackupName, restore.Name, logFile); err != nil { restoreErrors.Ark = append(restoreErrors.Ark, fmt.Sprintf("error uploading log file to object storage: %v", err)) } @@ -450,15 +456,15 @@ func (controller *restoreController) runRestore(restore *api.Restore, bucket str logContext.WithError(errors.WithStack(err)).Error("Error resetting results file offset to 0") return } - if err := controller.backupService.UploadRestoreResults(bucket, restore.Spec.BackupName, restore.Name, resultsFile); err != nil { + if err := cloudprovider.UploadRestoreResults(objectStore, bucket, restore.Spec.BackupName, restore.Name, resultsFile); err != nil { logContext.WithError(errors.WithStack(err)).Error("Error uploading results files to object storage") } return } -func downloadToTempFile(backupName string, backupService cloudprovider.BackupService, bucket string, logger logrus.FieldLogger) (*os.File, error) { - readCloser, err := backupService.DownloadBackup(bucket, backupName) +func downloadToTempFile(backupName string, objectStore cloudprovider.ObjectStore, bucket string, logger logrus.FieldLogger) (*os.File, error) { + readCloser, err := cloudprovider.DownloadBackup(objectStore, bucket, backupName) if err != nil { return nil, err } diff --git a/pkg/plugin/backup_item_action.go b/pkg/plugin/backup_item_action.go index 7168b6deea..d39fb20f56 100644 --- a/pkg/plugin/backup_item_action.go +++ b/pkg/plugin/backup_item_action.go @@ -19,9 +19,8 @@ package plugin import ( "encoding/json" - "k8s.io/apimachinery/pkg/util/sets" - "github.com/hashicorp/go-plugin" + "github.com/pkg/errors" "github.com/sirupsen/logrus" "golang.org/x/net/context" "google.golang.org/grpc" @@ -41,67 +40,40 @@ import ( type BackupItemActionPlugin struct { plugin.NetRPCUnsupportedPlugin - mux map[string]func() arkbackup.ItemAction + *serverImplFactoryMux + log *logrusAdapter } // NewBackupItemActionPlugin constructs a BackupItemActionPlugin. func NewBackupItemActionPlugin() *BackupItemActionPlugin { return &BackupItemActionPlugin{ - mux: make(map[string]func() arkbackup.ItemAction), + serverImplFactoryMux: newServerImplFactoryMux(), } } -func (p *BackupItemActionPlugin) Add(name string, f func() arkbackup.ItemAction) *BackupItemActionPlugin { - p.mux[name] = f - return p -} - -func (p *BackupItemActionPlugin) Names() []string { - return sets.StringKeySet(p.mux).List() -} - -// GRPCServer registers a BackupItemAction gRPC server. -func (p *BackupItemActionPlugin) GRPCServer(s *grpc.Server) error { - proto.RegisterBackupItemActionServer(s, &BackupItemActionGRPCServer{mux: p.mux, impls: make(map[string]arkbackup.ItemAction)}) - return nil -} +////////////////////////////////////////////////////////////////////////////// +// client code +////////////////////////////////////////////////////////////////////////////// // GRPCClient returns a BackupItemAction gRPC client. func (p *BackupItemActionPlugin) GRPCClient(c *grpc.ClientConn) (interface{}, error) { - return &backupItemClientMux{ - grpcClient: proto.NewBackupItemActionClient(c), - log: p.log, - clients: make(map[string]*BackupItemActionGRPCClient), - }, nil + return newClientMux(c, newBackupItemActionGRPCClient), nil } // BackupItemActionGRPCClient implements the backup/ItemAction interface and uses a // gRPC client to make calls to the plugin server. type BackupItemActionGRPCClient struct { + *xxxBase grpcClient proto.BackupItemActionClient - log *logrusAdapter - plugin string } -type backupItemClientMux struct { - grpcClient proto.BackupItemActionClient - log *logrusAdapter - clients map[string]*BackupItemActionGRPCClient +func newBackupItemActionGRPCClient() xxx { + return &BackupItemActionGRPCClient{xxxBase: &xxxBase{}} } -// clientFor returns a BackupItemActionGRPCClient for the BackupItemAction with the given name. -func (m *backupItemClientMux) clientFor(name string) interface{} { - if client, found := m.clients[name]; found { - return client - } - client := &BackupItemActionGRPCClient{ - plugin: name, - grpcClient: m.grpcClient, - log: m.log, - } - m.clients[name] = client - return client +func (c *BackupItemActionGRPCClient) setGrpcClient(clientConn *grpc.ClientConn) { + c.grpcClient = proto.NewBackupItemActionClient(clientConn) } func (c *BackupItemActionGRPCClient) AppliesTo() (arkbackup.ResourceSelector, error) { @@ -168,24 +140,42 @@ func (c *BackupItemActionGRPCClient) SetLog(log logrus.FieldLogger) { c.log.impl = log } +////////////////////////////////////////////////////////////////////////////// +// server code +////////////////////////////////////////////////////////////////////////////// + +// GRPCServer registers a BackupItemAction gRPC server. +func (p *BackupItemActionPlugin) GRPCServer(s *grpc.Server) error { + proto.RegisterBackupItemActionServer(s, &BackupItemActionGRPCServer{mux: p.serverImplFactoryMux}) + return nil +} + // BackupItemActionGRPCServer implements the proto-generated BackupItemActionServer interface, and accepts // gRPC calls and forwards them to an implementation of the pluggable interface. type BackupItemActionGRPCServer struct { - mux map[string]func() arkbackup.ItemAction - impls map[string]arkbackup.ItemAction + mux *serverImplFactoryMux } -func (s *BackupItemActionGRPCServer) getImpl(name string) arkbackup.ItemAction { - if impl, found := s.impls[name]; found { - return impl +func (s *BackupItemActionGRPCServer) getImpl(name string) (arkbackup.ItemAction, error) { + impl, err := s.mux.getImpl(name) + if err != nil { + return nil, err + } + + itemAction, ok := impl.(arkbackup.ItemAction) + if !ok { + return nil, errors.Errorf("%T is not a backup item action", impl) } - f := s.mux[name] - s.impls[name] = f() - return s.impls[name] + + return itemAction, nil } func (s *BackupItemActionGRPCServer) AppliesTo(ctx context.Context, req *proto.AppliesToRequest) (*proto.AppliesToResponse, error) { - impl := s.getImpl(req.Plugin) + impl, err := s.getImpl(req.Plugin) + if err != nil { + return nil, err + } + resourceSelector, err := impl.AppliesTo() if err != nil { return nil, err @@ -201,6 +191,11 @@ func (s *BackupItemActionGRPCServer) AppliesTo(ctx context.Context, req *proto.A } func (s *BackupItemActionGRPCServer) Execute(ctx context.Context, req *proto.ExecuteRequest) (*proto.ExecuteResponse, error) { + impl, err := s.getImpl(req.Plugin) + if err != nil { + return nil, err + } + var item unstructured.Unstructured var backup api.Backup @@ -211,7 +206,6 @@ func (s *BackupItemActionGRPCServer) Execute(ctx context.Context, req *proto.Exe return nil, err } - impl := s.getImpl(req.Plugin) updatedItem, additionalItems, err := impl.Execute(&item, &backup) if err != nil { return nil, err diff --git a/pkg/plugin/block_store.go b/pkg/plugin/block_store.go index c473af01cf..d2a3c5de7a 100644 --- a/pkg/plugin/block_store.go +++ b/pkg/plugin/block_store.go @@ -25,7 +25,6 @@ import ( "google.golang.org/grpc" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/sets" "github.com/heptio/ark/pkg/cloudprovider" proto "github.com/heptio/ark/pkg/plugin/generated" @@ -37,64 +36,38 @@ import ( type BlockStorePlugin struct { plugin.NetRPCUnsupportedPlugin - mux map[string]func() cloudprovider.BlockStore + *serverImplFactoryMux } // NewBlockStorePlugin constructs a BlockStorePlugin. func NewBlockStorePlugin() *BlockStorePlugin { return &BlockStorePlugin{ - mux: make(map[string]func() cloudprovider.BlockStore), + serverImplFactoryMux: newServerImplFactoryMux(), } } -func (p *BlockStorePlugin) Add(name string, f func() cloudprovider.BlockStore) *BlockStorePlugin { - p.mux[name] = f - return p -} - -func (p *BlockStorePlugin) Names() []string { - return sets.StringKeySet(p.mux).List() -} - -// GRPCServer registers a BlockStore gRPC server. -func (p *BlockStorePlugin) GRPCServer(s *grpc.Server) error { - proto.RegisterBlockStoreServer(s, &BlockStoreGRPCServer{mux: p.mux, impls: make(map[string]cloudprovider.BlockStore)}) - return nil -} +////////////////////////////////////////////////////////////////////////////// +// client code +////////////////////////////////////////////////////////////////////////////// // GRPCClient returns a BlockStore gRPC client. func (p *BlockStorePlugin) GRPCClient(c *grpc.ClientConn) (interface{}, error) { - return &blockStoreClientMux{ - grpcClient: proto.NewBlockStoreClient(c), - clients: make(map[string]*BlockStoreGRPCClient), - }, nil + return newClientMux(c, newBlockStoreGRPCClient), nil } -type blockStoreClientMux struct { +// BlockStoreGRPCClient implements the cloudprovider.BlockStore interface and uses a +// gRPC client to make calls to the plugin server. +type BlockStoreGRPCClient struct { + *xxxBase grpcClient proto.BlockStoreClient - log *logrusAdapter - clients map[string]*BlockStoreGRPCClient } -func (m *blockStoreClientMux) GetByName(name string) interface{} { - if client, found := m.clients[name]; found { - return client - } - client := &BlockStoreGRPCClient{ - plugin: name, - grpcClient: m.grpcClient, - log: m.log, - } - m.clients[name] = client - return client +func newBlockStoreGRPCClient() xxx { + return &BlockStoreGRPCClient{xxxBase: &xxxBase{}} } -// BlockStoreGRPCClient implements the cloudprovider.BlockStore interface and uses a -// gRPC client to make calls to the plugin server. -type BlockStoreGRPCClient struct { - grpcClient proto.BlockStoreClient - log *logrusAdapter - plugin string +func (c *BlockStoreGRPCClient) setGrpcClient(clientConn *grpc.ClientConn) { + c.grpcClient = proto.NewBlockStoreClient(clientConn) } // Init prepares the BlockStore for usage using the provided map of @@ -226,26 +199,34 @@ func (c *BlockStoreGRPCClient) SetVolumeID(pv runtime.Unstructured, volumeID str return &updatedPV, nil } +////////////////////////////////////////////////////////////////////////////// +// server code +////////////////////////////////////////////////////////////////////////////// + +// GRPCServer registers a BlockStore gRPC server. +func (p *BlockStorePlugin) GRPCServer(s *grpc.Server) error { + proto.RegisterBlockStoreServer(s, &BlockStoreGRPCServer{mux: p.serverImplFactoryMux}) + return nil +} + // BlockStoreGRPCServer implements the proto-generated BlockStoreServer interface, and accepts // gRPC calls and forwards them to an implementation of the pluggable interface. type BlockStoreGRPCServer struct { - mux map[string]func() cloudprovider.BlockStore - impls map[string]cloudprovider.BlockStore + mux *serverImplFactoryMux } func (s *BlockStoreGRPCServer) getImpl(name string) (cloudprovider.BlockStore, error) { - if impl, found := s.impls[name]; found { - return impl, nil + impl, err := s.mux.getImpl(name) + if err != nil { + return nil, err } - f, found := s.mux[name] - if !found { - return nil, errors.Errorf("unable to find BlockStore %s", name) + blockStore, ok := impl.(cloudprovider.BlockStore) + if !ok { + return nil, errors.Errorf("%T is not a block store", impl) } - s.impls[name] = f() - - return s.impls[name], nil + return blockStore, nil } // Init prepares the BlockStore for usage using the provided map of @@ -256,6 +237,7 @@ func (s *BlockStoreGRPCServer) Init(ctx context.Context, req *proto.InitRequest) if err != nil { return nil, err } + if err := impl.Init(req.Config); err != nil { return nil, err } @@ -266,6 +248,11 @@ func (s *BlockStoreGRPCServer) Init(ctx context.Context, req *proto.InitRequest) // CreateVolumeFromSnapshot creates a new block volume, initialized from the provided snapshot, // and with the specified type and IOPS (if using provisioned IOPS). func (s *BlockStoreGRPCServer) CreateVolumeFromSnapshot(ctx context.Context, req *proto.CreateVolumeRequest) (*proto.CreateVolumeResponse, error) { + impl, err := s.getImpl(req.Plugin) + if err != nil { + return nil, err + } + snapshotID := req.SnapshotID volumeType := req.VolumeType volumeAZ := req.VolumeAZ @@ -275,10 +262,6 @@ func (s *BlockStoreGRPCServer) CreateVolumeFromSnapshot(ctx context.Context, req iops = &req.Iops } - impl, err := s.getImpl(req.Plugin) - if err != nil { - return nil, err - } volumeID, err := impl.CreateVolumeFromSnapshot(snapshotID, volumeType, volumeAZ, iops) if err != nil { return nil, err @@ -294,6 +277,7 @@ func (s *BlockStoreGRPCServer) GetVolumeInfo(ctx context.Context, req *proto.Get if err != nil { return nil, err } + volumeType, iops, err := impl.GetVolumeInfo(req.VolumeID, req.VolumeAZ) if err != nil { return nil, err @@ -316,6 +300,7 @@ func (s *BlockStoreGRPCServer) IsVolumeReady(ctx context.Context, req *proto.IsV if err != nil { return nil, err } + ready, err := impl.IsVolumeReady(req.VolumeID, req.VolumeAZ) if err != nil { return nil, err @@ -331,6 +316,7 @@ func (s *BlockStoreGRPCServer) CreateSnapshot(ctx context.Context, req *proto.Cr if err != nil { return nil, err } + snapshotID, err := impl.CreateSnapshot(req.VolumeID, req.VolumeAZ, req.Tags) if err != nil { return nil, err @@ -345,6 +331,7 @@ func (s *BlockStoreGRPCServer) DeleteSnapshot(ctx context.Context, req *proto.De if err != nil { return nil, err } + if err := impl.DeleteSnapshot(req.SnapshotID); err != nil { return nil, err } @@ -353,16 +340,17 @@ func (s *BlockStoreGRPCServer) DeleteSnapshot(ctx context.Context, req *proto.De } func (s *BlockStoreGRPCServer) GetVolumeID(ctx context.Context, req *proto.GetVolumeIDRequest) (*proto.GetVolumeIDResponse, error) { + impl, err := s.getImpl(req.Plugin) + if err != nil { + return nil, err + } + var pv unstructured.Unstructured if err := json.Unmarshal(req.PersistentVolume, &pv); err != nil { return nil, err } - impl, err := s.getImpl(req.Plugin) - if err != nil { - return nil, err - } volumeID, err := impl.GetVolumeID(&pv) if err != nil { return nil, err @@ -372,16 +360,17 @@ func (s *BlockStoreGRPCServer) GetVolumeID(ctx context.Context, req *proto.GetVo } func (s *BlockStoreGRPCServer) SetVolumeID(ctx context.Context, req *proto.SetVolumeIDRequest) (*proto.SetVolumeIDResponse, error) { + impl, err := s.getImpl(req.Plugin) + if err != nil { + return nil, err + } + var pv unstructured.Unstructured if err := json.Unmarshal(req.PersistentVolume, &pv); err != nil { return nil, err } - impl, err := s.getImpl(req.Plugin) - if err != nil { - return nil, err - } updatedPV, err := impl.SetVolumeID(&pv, req.VolumeID) if err != nil { return nil, err diff --git a/pkg/plugin/interface.go b/pkg/plugin/interface.go index 2cecf6c368..619e6cac14 100644 --- a/pkg/plugin/interface.go +++ b/pkg/plugin/interface.go @@ -22,7 +22,7 @@ import plugin "github.com/hashicorp/go-plugin" type Interface interface { plugin.Plugin - Names() []string + names() []string } type GetByNamer interface { diff --git a/pkg/plugin/manager.go b/pkg/plugin/manager.go index 09228c0595..5816615dc8 100644 --- a/pkg/plugin/manager.go +++ b/pkg/plugin/manager.go @@ -44,6 +44,8 @@ type Manager interface { // CloseBackupItemActions(). GetBackupItemActions() ([]backup.ItemAction, error) + GetBackupItemAction(name string) (backup.ItemAction, error) + // GetRestoreItemActions returns all restore.ItemAction plugins. // These plugin instances should ONLY be used for a single restore // (mainly because each one outputs to a per-restore log), @@ -159,18 +161,28 @@ func (m *manager) GetBackupItemActions() ([]backup.ItemAction, error) { for i := range list { id := list[i] - wrapper, err := m.getWrapper(PluginKindBackupItemAction, id.Name) + + r, err := m.GetBackupItemAction(id.Name) if err != nil { return nil, err } - r := newResumableBackupItemAction(id.Name, wrapper) actions = append(actions, r) } return actions, nil } +func (m *manager) GetBackupItemAction(name string) (backup.ItemAction, error) { + wrapper, err := m.getWrapper(PluginKindBackupItemAction, name) + if err != nil { + return nil, err + } + + r := newResumableBackupItemAction(name, wrapper) + return r, nil +} + func (m *manager) GetRestoreItemActions() ([]restore.ItemAction, error) { list := m.registry.List(PluginKindRestoreItemAction) diff --git a/pkg/plugin/object_store.go b/pkg/plugin/object_store.go index eaf8e5ac78..68a0443dac 100644 --- a/pkg/plugin/object_store.go +++ b/pkg/plugin/object_store.go @@ -21,10 +21,10 @@ import ( "time" "github.com/hashicorp/go-plugin" + "github.com/pkg/errors" "github.com/sirupsen/logrus" "golang.org/x/net/context" "google.golang.org/grpc" - "k8s.io/apimachinery/pkg/util/sets" "github.com/heptio/ark/pkg/cloudprovider" proto "github.com/heptio/ark/pkg/plugin/generated" @@ -38,66 +38,41 @@ const byteChunkSize = 16384 type ObjectStorePlugin struct { plugin.NetRPCUnsupportedPlugin - mux map[string]func() cloudprovider.ObjectStore + *serverImplFactoryMux + log *logrusAdapter } // NewObjectStorePlugin construct an ObjectStorePlugin. func NewObjectStorePlugin() *ObjectStorePlugin { return &ObjectStorePlugin{ - mux: make(map[string]func() cloudprovider.ObjectStore), + serverImplFactoryMux: newServerImplFactoryMux(), } } -func (p *ObjectStorePlugin) Add(name string, f func() cloudprovider.ObjectStore) *ObjectStorePlugin { - p.mux[name] = f - return p -} - -func (p *ObjectStorePlugin) Names() []string { - return sets.StringKeySet(p.mux).List() -} - -// GRPCServer registers an ObjectStore gRPC server. -func (p *ObjectStorePlugin) GRPCServer(s *grpc.Server) error { - proto.RegisterObjectStoreServer(s, &ObjectStoreGRPCServer{mux: p.mux, impls: make(map[string]cloudprovider.ObjectStore)}) - return nil -} +////////////////////////////////////////////////////////////////////////////// +// client code +////////////////////////////////////////////////////////////////////////////// // GRPCClient returns an ObjectStore gRPC client. func (p *ObjectStorePlugin) GRPCClient(c *grpc.ClientConn) (interface{}, error) { - return &objectStoreClientMux{ - grpcClient: proto.NewObjectStoreClient(c), - log: p.log, - clients: make(map[string]*ObjectStoreGRPCClient), - }, nil + return newClientMux(c, newObjectStoreGRPCClient), nil + } // ObjectStoreGRPCClient implements the cloudprovider.ObjectStore interface and uses a // gRPC client to make calls to the plugin server. type ObjectStoreGRPCClient struct { + *xxxBase grpcClient proto.ObjectStoreClient - log *logrusAdapter - plugin string } -type objectStoreClientMux struct { - grpcClient proto.ObjectStoreClient - log *logrusAdapter - clients map[string]*ObjectStoreGRPCClient +func newObjectStoreGRPCClient() xxx { + return &ObjectStoreGRPCClient{xxxBase: &xxxBase{}} } -func (m *objectStoreClientMux) GetByName(name string) interface{} { - if client, found := m.clients[name]; found { - return client - } - client := &ObjectStoreGRPCClient{ - plugin: name, - grpcClient: m.grpcClient, - log: m.log, - } - m.clients[name] = client - return client +func (c *ObjectStoreGRPCClient) setGrpcClient(clientConn *grpc.ClientConn) { + c.grpcClient = proto.NewObjectStoreClient(clientConn) } // Init prepares the ObjectStore for usage using the provided map of @@ -210,27 +185,45 @@ func (c *ObjectStoreGRPCClient) SetLog(log logrus.FieldLogger) { c.log.impl = log } +////////////////////////////////////////////////////////////////////////////// +// server code +////////////////////////////////////////////////////////////////////////////// + +// GRPCServer registers an ObjectStore gRPC server. +func (p *ObjectStorePlugin) GRPCServer(s *grpc.Server) error { + proto.RegisterObjectStoreServer(s, &ObjectStoreGRPCServer{mux: p.serverImplFactoryMux}) + return nil +} + // ObjectStoreGRPCServer implements the proto-generated ObjectStoreServer interface, and accepts // gRPC calls and forwards them to an implementation of the pluggable interface. type ObjectStoreGRPCServer struct { - mux map[string]func() cloudprovider.ObjectStore - impls map[string]cloudprovider.ObjectStore + mux *serverImplFactoryMux } -func (s *ObjectStoreGRPCServer) getImpl(name string) cloudprovider.ObjectStore { - if impl, found := s.impls[name]; found { - return impl +func (s *ObjectStoreGRPCServer) getImpl(name string) (cloudprovider.ObjectStore, error) { + impl, err := s.mux.getImpl(name) + if err != nil { + return nil, err + } + + itemAction, ok := impl.(cloudprovider.ObjectStore) + if !ok { + return nil, errors.Errorf("%T is not an object store", impl) } - f := s.mux[name] - s.impls[name] = f() - return s.impls[name] + + return itemAction, nil } // Init prepares the ObjectStore for usage using the provided map of // configuration key-value pairs. It returns an error if the ObjectStore // cannot be initialized from the provided config. func (s *ObjectStoreGRPCServer) Init(ctx context.Context, req *proto.InitRequest) (*proto.Empty, error) { - impl := s.getImpl(req.Plugin) + impl, err := s.getImpl(req.Plugin) + if err != nil { + return nil, err + } + if err := impl.Init(req.Config); err != nil { return nil, err } @@ -248,7 +241,11 @@ func (s *ObjectStoreGRPCServer) PutObject(stream proto.ObjectStore_PutObjectServ return err } - impl := s.getImpl(firstChunk.Plugin) + impl, err := s.getImpl(firstChunk.Plugin) + if err != nil { + return err + } + bucket := firstChunk.Bucket key := firstChunk.Key @@ -280,7 +277,11 @@ func (s *ObjectStoreGRPCServer) PutObject(stream proto.ObjectStore_PutObjectServ // GetObject retrieves the object with the given key from the specified // bucket in object storage. func (s *ObjectStoreGRPCServer) GetObject(req *proto.GetObjectRequest, stream proto.ObjectStore_GetObjectServer) error { - impl := s.getImpl(req.Plugin) + impl, err := s.getImpl(req.Plugin) + if err != nil { + return err + } + rdr, err := impl.GetObject(req.Bucket, req.Key) if err != nil { return err @@ -306,7 +307,11 @@ func (s *ObjectStoreGRPCServer) GetObject(req *proto.GetObjectRequest, stream pr // before the provided delimiter (this is often used to simulate a directory // hierarchy in object storage). func (s *ObjectStoreGRPCServer) ListCommonPrefixes(ctx context.Context, req *proto.ListCommonPrefixesRequest) (*proto.ListCommonPrefixesResponse, error) { - impl := s.getImpl(req.Plugin) + impl, err := s.getImpl(req.Plugin) + if err != nil { + return nil, err + } + prefixes, err := impl.ListCommonPrefixes(req.Bucket, req.Delimiter) if err != nil { return nil, err @@ -317,7 +322,11 @@ func (s *ObjectStoreGRPCServer) ListCommonPrefixes(ctx context.Context, req *pro // ListObjects gets a list of all objects in bucket that have the same prefix. func (s *ObjectStoreGRPCServer) ListObjects(ctx context.Context, req *proto.ListObjectsRequest) (*proto.ListObjectsResponse, error) { - impl := s.getImpl(req.Plugin) + impl, err := s.getImpl(req.Plugin) + if err != nil { + return nil, err + } + keys, err := impl.ListObjects(req.Bucket, req.Prefix) if err != nil { return nil, err @@ -329,18 +338,25 @@ func (s *ObjectStoreGRPCServer) ListObjects(ctx context.Context, req *proto.List // DeleteObject removes object with the specified key from the given // bucket. func (s *ObjectStoreGRPCServer) DeleteObject(ctx context.Context, req *proto.DeleteObjectRequest) (*proto.Empty, error) { - impl := s.getImpl(req.Plugin) - err := impl.DeleteObject(req.Bucket, req.Key) + impl, err := s.getImpl(req.Plugin) if err != nil { return nil, err } + if err := impl.DeleteObject(req.Bucket, req.Key); err != nil { + return nil, err + } + return &proto.Empty{}, nil } // CreateSignedURL creates a pre-signed URL for the given bucket and key that expires after ttl. func (s *ObjectStoreGRPCServer) CreateSignedURL(ctx context.Context, req *proto.CreateSignedURLRequest) (*proto.CreateSignedURLResponse, error) { - impl := s.getImpl(req.Plugin) + impl, err := s.getImpl(req.Plugin) + if err != nil { + return nil, err + } + url, err := impl.CreateSignedURL(req.Bucket, req.Key, time.Duration(req.Ttl)) if err != nil { return nil, err diff --git a/pkg/plugin/registry.go b/pkg/plugin/registry.go index 56498231a8..4b1563e116 100644 --- a/pkg/plugin/registry.go +++ b/pkg/plugin/registry.go @@ -11,6 +11,7 @@ import ( ) type Registry interface { + SkipInternalDiscovery() DiscoverPlugins() error List(kind PluginKind) []PluginIdentifier Get(kind PluginKind, name string) (PluginIdentifier, error) @@ -25,39 +26,74 @@ type kindAndName struct { // is registered as supporting multiple PluginKinds, it will be // gettable/listable for all of those kinds. type registry struct { - dir string - logger logrus.FieldLogger - // logLevel logrus.Level - pluginsByID map[kindAndName]PluginIdentifier - pluginsByKind map[PluginKind][]PluginIdentifier + dir string + skipInternalDiscovery bool + logger logrus.FieldLogger + logLevel logrus.Level + pluginsByID map[kindAndName]PluginIdentifier + pluginsByKind map[PluginKind][]PluginIdentifier } -func NewRegistry(dir string, logger logrus.FieldLogger /*, logLevel logrus.Level*/) Registry { +func NewRegistry(dir string, logger logrus.FieldLogger, logLevel logrus.Level) Registry { return ®istry{ - dir: dir, - logger: logger, - // logLevel: logLevel, + dir: dir, + logger: logger, + logLevel: logLevel, pluginsByID: make(map[kindAndName]PluginIdentifier), pluginsByKind: make(map[PluginKind][]PluginIdentifier), } } +func (r *registry) SkipInternalDiscovery() { + r.skipInternalDiscovery = true +} + func (r *registry) readPluginsDir() ([]string, error) { - if _, err := os.Stat(r.dir); err != nil { + return readPluginsDir(r.dir) +} + +func readPluginsDir(dir string) ([]string, error) { + if _, err := os.Stat(dir); err != nil { if os.IsNotExist(err) { return []string{}, nil } return nil, errors.WithStack(err) } - files, err := ioutil.ReadDir(r.dir) + files, err := ioutil.ReadDir(dir) if err != nil { return nil, errors.WithStack(err) } fullPaths := make([]string, 0, len(files)) for _, file := range files { - fullPaths = append(fullPaths, filepath.Join(r.dir, file.Name())) + fullPath := filepath.Join(dir, file.Name()) + + stat, err := os.Stat(fullPath) + if err != nil { + //TODO(ncdc) log or remove + //fmt.Printf("ERROR STATTING %s: %v\n", fullPath, err) + continue + } + + if stat.IsDir() { + subDirPaths, err := readPluginsDir(fullPath) + if err != nil { + return nil, err + } + fullPaths = append(fullPaths, subDirPaths...) + continue + } + + mode := stat.Mode() + // fmt.Printf("MODE: %v\n", mode) + if (mode & 0111) == 0 { + //TODO(ncdc) log or remove + // fmt.Printf("NOT EXECUTABLE: %s\n", fullPath) + continue + } + + fullPaths = append(fullPaths, fullPath) } return fullPaths, nil } @@ -68,7 +104,11 @@ func (r *registry) DiscoverPlugins() error { return err } - commands := []string{os.Args[0]} // ark itself + var commands []string + if !r.skipInternalDiscovery { + // ark's internal plugins + commands = append(commands, os.Args[0]) + } commands = append(commands, plugins...) for _, command := range commands { @@ -91,7 +131,7 @@ func (r *registry) DiscoverPlugins() error { } func (r *registry) listPlugins(command string) ([]PluginIdentifier, error) { - // logger := &logrusAdapter{impl: r.logger, level: r.logLevel} + logger := &logrusAdapter{impl: r.logger, level: r.logLevel} var args []string if command == os.Args[0] { @@ -99,8 +139,8 @@ func (r *registry) listPlugins(command string) ([]PluginIdentifier, error) { } builder := newClientBuilder(). - withCommand(command, args...) - // withLogger(logger) + withCommand(command, args...). + withLogger(logger) client := builder.client() diff --git a/pkg/plugin/restore_item_action.go b/pkg/plugin/restore_item_action.go index 361ab78597..f79891913a 100644 --- a/pkg/plugin/restore_item_action.go +++ b/pkg/plugin/restore_item_action.go @@ -27,7 +27,6 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/sets" api "github.com/heptio/ark/pkg/apis/ark/v1" proto "github.com/heptio/ark/pkg/plugin/generated" @@ -40,66 +39,40 @@ import ( type RestoreItemActionPlugin struct { plugin.NetRPCUnsupportedPlugin - mux map[string]func() restore.ItemAction + *serverImplFactoryMux + log *logrusAdapter } // NewRestoreItemActionPlugin constructs a RestoreItemActionPlugin. func NewRestoreItemActionPlugin() *RestoreItemActionPlugin { return &RestoreItemActionPlugin{ - mux: make(map[string]func() restore.ItemAction), + serverImplFactoryMux: newServerImplFactoryMux(), } } -func (p *RestoreItemActionPlugin) Add(name string, f func() restore.ItemAction) *RestoreItemActionPlugin { - p.mux[name] = f - return p -} - -func (p *RestoreItemActionPlugin) Names() []string { - return sets.StringKeySet(p.mux).List() -} - -// GRPCServer registers a RestoreItemAction gRPC server. -func (p *RestoreItemActionPlugin) GRPCServer(s *grpc.Server) error { - proto.RegisterRestoreItemActionServer(s, &RestoreItemActionGRPCServer{mux: p.mux, impls: make(map[string]restore.ItemAction)}) - return nil -} +////////////////////////////////////////////////////////////////////////////// +// client code +////////////////////////////////////////////////////////////////////////////// // GRPCClient returns a RestoreItemAction gRPC client. func (p *RestoreItemActionPlugin) GRPCClient(c *grpc.ClientConn) (interface{}, error) { - return &restoreItemClientMux{ - grpcClient: proto.NewRestoreItemActionClient(c), - log: p.log, - clients: make(map[string]*RestoreItemActionGRPCClient), - }, nil + return newClientMux(c, newRestoreItemActionGRPCClient), nil } // RestoreItemActionGRPCClient implements the backup/ItemAction interface and uses a // gRPC client to make calls to the plugin server. type RestoreItemActionGRPCClient struct { + *xxxBase grpcClient proto.RestoreItemActionClient - log *logrusAdapter - plugin string } -type restoreItemClientMux struct { - grpcClient proto.RestoreItemActionClient - log *logrusAdapter - clients map[string]*RestoreItemActionGRPCClient +func newRestoreItemActionGRPCClient() xxx { + return &RestoreItemActionGRPCClient{xxxBase: &xxxBase{}} } -func (m *restoreItemClientMux) GetByName(name string) interface{} { - if client, found := m.clients[name]; found { - return client - } - client := &RestoreItemActionGRPCClient{ - plugin: name, - grpcClient: m.grpcClient, - log: m.log, - } - m.clients[name] = client - return client +func (c *RestoreItemActionGRPCClient) setGrpcClient(clientConn *grpc.ClientConn) { + c.grpcClient = proto.NewRestoreItemActionClient(clientConn) } func (c *RestoreItemActionGRPCClient) AppliesTo() (restore.ResourceSelector, error) { @@ -156,24 +129,42 @@ func (c *RestoreItemActionGRPCClient) SetLog(log logrus.FieldLogger) { c.log.impl = log } +////////////////////////////////////////////////////////////////////////////// +// server code +////////////////////////////////////////////////////////////////////////////// + +// GRPCServer registers a RestoreItemAction gRPC server. +func (p *RestoreItemActionPlugin) GRPCServer(s *grpc.Server) error { + proto.RegisterRestoreItemActionServer(s, &RestoreItemActionGRPCServer{mux: p.serverImplFactoryMux}) + return nil +} + // RestoreItemActionGRPCServer implements the proto-generated RestoreItemActionServer interface, and accepts // gRPC calls and forwards them to an implementation of the pluggable interface. type RestoreItemActionGRPCServer struct { - mux map[string]func() restore.ItemAction - impls map[string]restore.ItemAction + mux *serverImplFactoryMux } -func (s *RestoreItemActionGRPCServer) getImpl(name string) restore.ItemAction { - if impl, found := s.impls[name]; found { - return impl +func (s *RestoreItemActionGRPCServer) getImpl(name string) (restore.ItemAction, error) { + impl, err := s.mux.getImpl(name) + if err != nil { + return nil, err + } + + itemAction, ok := impl.(restore.ItemAction) + if !ok { + return nil, errors.Errorf("%T is not a restore item action", impl) } - f := s.mux[name] - s.impls[name] = f() - return s.impls[name] + + return itemAction, nil } func (s *RestoreItemActionGRPCServer) AppliesTo(ctx context.Context, req *proto.AppliesToRequest) (*proto.AppliesToResponse, error) { - impl := s.getImpl(req.Plugin) + impl, err := s.getImpl(req.Plugin) + if err != nil { + return nil, err + } + appliesTo, err := impl.AppliesTo() if err != nil { return nil, err @@ -189,6 +180,11 @@ func (s *RestoreItemActionGRPCServer) AppliesTo(ctx context.Context, req *proto. } func (s *RestoreItemActionGRPCServer) Execute(ctx context.Context, req *proto.RestoreExecuteRequest) (*proto.RestoreExecuteResponse, error) { + impl, err := s.getImpl(req.Plugin) + if err != nil { + return nil, err + } + var ( item unstructured.Unstructured restore api.Restore @@ -202,7 +198,6 @@ func (s *RestoreItemActionGRPCServer) Execute(ctx context.Context, req *proto.Re return nil, err } - impl := s.getImpl(req.Plugin) res, warning, err := impl.Execute(&item, &restore) if err != nil { return nil, err diff --git a/pkg/plugin/server_impl_factory.go b/pkg/plugin/server_impl_factory.go new file mode 100644 index 0000000000..22f79f9a97 --- /dev/null +++ b/pkg/plugin/server_impl_factory.go @@ -0,0 +1,62 @@ +package plugin + +import ( + "github.com/heptio/ark/pkg/util/logging" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/util/sets" +) + +type ServerImplFactory func() (interface{}, error) + +type serverImplFactoryMux struct { + initializers map[string]ServerImplFactory + impls map[string]interface{} + serverLog logrus.FieldLogger +} + +func newServerImplFactoryMux() *serverImplFactoryMux { + return &serverImplFactoryMux{ + initializers: make(map[string]ServerImplFactory), + impls: make(map[string]interface{}), + } +} + +func (m *serverImplFactoryMux) register(name string, f ServerImplFactory) { + m.initializers[name] = f +} + +func (m *serverImplFactoryMux) names() []string { + return sets.StringKeySet(m.initializers).List() +} + +func (m *serverImplFactoryMux) setServerLog(log logrus.FieldLogger) { + m.serverLog = log +} + +// getImpl returns the implementation for a plugin with the given name. If an instance has already been initialized, +// that is returned. Otherwise, the instance is initialized by calling its initialization function. If the instance +// implements logging.LogSetter, getImpl calls SetLog and passes the plugin process's logger to the instance. +func (m *serverImplFactoryMux) getImpl(name string) (interface{}, error) { + if impl, found := m.impls[name]; found { + return impl, nil + } + + initializer, found := m.initializers[name] + if !found { + return nil, errors.Errorf("unknown plugin: %s", name) + } + + impl, err := initializer() + if err != nil { + return nil, err + } + + m.impls[name] = impl + + if logSetter, ok := impl.(logging.LogSetter); ok { + logSetter.SetLog(m.serverLog) + } + + return m.impls[name], nil +} diff --git a/pkg/plugin/shared.go b/pkg/plugin/shared.go index 21e1edc3fa..088e205f5c 100644 --- a/pkg/plugin/shared.go +++ b/pkg/plugin/shared.go @@ -20,9 +20,7 @@ import ( "os" plugin "github.com/hashicorp/go-plugin" - "github.com/heptio/ark/pkg/backup" - "github.com/heptio/ark/pkg/cloudprovider" - "github.com/heptio/ark/pkg/restore" + "github.com/sirupsen/logrus" ) // Handshake is configuration information that allows go-plugin @@ -34,60 +32,102 @@ var Handshake = plugin.HandshakeConfig{ } type SimpleServer interface { - RegisterBackupItemActions(map[string]func() backup.ItemAction) SimpleServer - RegisterBlockStores(map[string]func() cloudprovider.BlockStore) SimpleServer - RegisterObjectStores(map[string]func() cloudprovider.ObjectStore) SimpleServer - RegisterRestoreItemActions(map[string]func() restore.ItemAction) SimpleServer + RegisterBackupItemAction(name string, factory ServerImplFactory) SimpleServer + RegisterBackupItemActions(map[string]ServerImplFactory) SimpleServer + + RegisterBlockStore(name string, factory ServerImplFactory) SimpleServer + RegisterBlockStores(map[string]ServerImplFactory) SimpleServer + + RegisterObjectStore(name string, factory ServerImplFactory) SimpleServer + RegisterObjectStores(map[string]ServerImplFactory) SimpleServer + + RegisterRestoreItemAction(name string, factory ServerImplFactory) SimpleServer + RegisterRestoreItemActions(map[string]ServerImplFactory) SimpleServer + Serve() } type simpleServer struct { + log logrus.FieldLogger backupItemAction *BackupItemActionPlugin blockStore *BlockStorePlugin objectStore *ObjectStorePlugin restoreItemAction *RestoreItemActionPlugin } -func NewSimpleServer() SimpleServer { +func NewSimpleServer(log logrus.FieldLogger) SimpleServer { + backupItemAction := NewBackupItemActionPlugin() + backupItemAction.setServerLog(log) + + blockStore := NewBlockStorePlugin() + blockStore.setServerLog(log) + + objectStore := NewObjectStorePlugin() + objectStore.setServerLog(log) + + restoreItemAction := NewRestoreItemActionPlugin() + restoreItemAction.setServerLog(log) + return &simpleServer{ - backupItemAction: NewBackupItemActionPlugin(), - blockStore: NewBlockStorePlugin(), - objectStore: NewObjectStorePlugin(), - restoreItemAction: NewRestoreItemActionPlugin(), + log: log, + backupItemAction: backupItemAction, + blockStore: blockStore, + objectStore: objectStore, + restoreItemAction: restoreItemAction, } } -func (s *simpleServer) RegisterBackupItemActions(m map[string]func() backup.ItemAction) SimpleServer { +func (s *simpleServer) RegisterBackupItemAction(name string, factory ServerImplFactory) SimpleServer { + s.backupItemAction.register(name, factory) + return s +} + +func (s *simpleServer) RegisterBackupItemActions(m map[string]ServerImplFactory) SimpleServer { for name := range m { - s.backupItemAction.Add(name, m[name]) + s.RegisterBackupItemAction(name, m[name]) } return s } -func (s *simpleServer) RegisterBlockStores(m map[string]func() cloudprovider.BlockStore) SimpleServer { +func (s *simpleServer) RegisterBlockStore(name string, factory ServerImplFactory) SimpleServer { + s.blockStore.register(name, factory) + return s +} + +func (s *simpleServer) RegisterBlockStores(m map[string]ServerImplFactory) SimpleServer { for name := range m { - s.blockStore.Add(name, m[name]) + s.RegisterBlockStore(name, m[name]) } return s } -func (s *simpleServer) RegisterObjectStores(m map[string]func() cloudprovider.ObjectStore) SimpleServer { +func (s *simpleServer) RegisterObjectStore(name string, factory ServerImplFactory) SimpleServer { + s.objectStore.register(name, factory) + return s +} + +func (s *simpleServer) RegisterObjectStores(m map[string]ServerImplFactory) SimpleServer { for name := range m { - s.objectStore.Add(name, m[name]) + s.RegisterObjectStore(name, m[name]) } return s } -func (s *simpleServer) RegisterRestoreItemActions(m map[string]func() restore.ItemAction) SimpleServer { +func (s *simpleServer) RegisterRestoreItemAction(name string, factory ServerImplFactory) SimpleServer { + s.restoreItemAction.register(name, factory) + return s +} + +func (s *simpleServer) RegisterRestoreItemActions(m map[string]ServerImplFactory) SimpleServer { for name := range m { - s.restoreItemAction.Add(name, m[name]) + s.RegisterRestoreItemAction(name, m[name]) } return s } func getNames(command string, kind PluginKind, plugin Interface) []PluginIdentifier { var pluginIdentifiers []PluginIdentifier - for _, name := range plugin.Names() { + for _, name := range plugin.names() { id := PluginIdentifier{Command: command, Kind: kind, Name: name} pluginIdentifiers = append(pluginIdentifiers, id) } @@ -96,6 +136,10 @@ func getNames(command string, kind PluginKind, plugin Interface) []PluginIdentif // Serve serves the plugin p. func (s *simpleServer) Serve() { + defer func() { + s.log.Error("ANDY ANDY ANDY") + }() + command := os.Args[0] var pluginIdentifiers []PluginIdentifier diff --git a/pkg/plugin/wrapper.go b/pkg/plugin/wrapper.go index d8b8ed2c2d..b0094c7156 100644 --- a/pkg/plugin/wrapper.go +++ b/pkg/plugin/wrapper.go @@ -7,6 +7,7 @@ import ( plugin "github.com/hashicorp/go-plugin" "github.com/pkg/errors" "github.com/sirupsen/logrus" + "google.golang.org/grpc" ) // wrapper encapsulates the lifecycle for all plugins contained in a single executable file. It is able to restart a @@ -158,7 +159,7 @@ func (w *wrapper) dispenseLH(key kindAndName) (interface{}, error) { return nil, errors.WithStack(err) } - if mux, ok := dispensed.(clientMux); ok { + if mux, ok := dispensed.(*clientMuxImpl); ok { dispensed = mux.clientFor(key.name) } @@ -169,11 +170,60 @@ func (w *wrapper) dispenseLH(key kindAndName) (interface{}, error) { } // clientMux allows a dispensed plugin to support multiple implementations, such as AWS and GCP object stores. -type clientMux interface { - // clientFor returns a gRPC client for the plugin named name. Note, the return type is interface{} because there - // isn't an interface for a gRPC client itself; the returned object must implement one of our plugin interfaces, - // such as ObjectStore. - clientFor(name string) interface{} +// type clientMux interface { +// // clientFor returns a gRPC client for the plugin named name. Note, the return type is interface{} because there +// // isn't an interface for a gRPC client itself; the returned object must implement one of our plugin interfaces, +// // such as ObjectStore. +// clientFor(name string) interface{} +// } + +type xxx interface { + setPlugin(name string) + setGrpcClient(clientConn *grpc.ClientConn) + setLog(log *logrusAdapter) +} + +type xxxBase struct { + plugin string + log *logrusAdapter +} + +func (x *xxxBase) setPlugin(name string) { + x.plugin = name +} + +func (x *xxxBase) setLog(log *logrusAdapter) { + x.log = log +} + +type clientMuxImpl struct { + clientConn *grpc.ClientConn + log *logrusAdapter + initFunc func() xxx + clients map[string]xxx +} + +func newClientMux(clientConn *grpc.ClientConn, initFunc func() xxx) *clientMuxImpl { + m := &clientMuxImpl{ + clientConn: clientConn, + initFunc: initFunc, + clients: make(map[string]xxx), + } + + return m +} + +func (m *clientMuxImpl) clientFor(name string) interface{} { + if client, found := m.clients[name]; found { + return client + } + + client := m.initFunc() + client.setPlugin(name) + client.setGrpcClient(m.clientConn) + client.setLog(m.log) + m.clients[name] = client + return client } // stop terminates the plugin process. diff --git a/pkg/restore/job_action.go b/pkg/restore/job_action.go index e81219b544..ef702187c5 100644 --- a/pkg/restore/job_action.go +++ b/pkg/restore/job_action.go @@ -29,10 +29,12 @@ type jobAction struct { logger logrus.FieldLogger } -func NewJobAction(logger logrus.FieldLogger) ItemAction { - return &jobAction{ - logger: logger, - } +func NewJobAction() ItemAction { + return &jobAction{} +} + +func (a *jobAction) SetLog(log logrus.FieldLogger) { + a.logger = log } func (a *jobAction) AppliesTo() (ResourceSelector, error) { diff --git a/pkg/restore/pod_action.go b/pkg/restore/pod_action.go index 6d0b8b5cd1..fc4c3a891d 100644 --- a/pkg/restore/pod_action.go +++ b/pkg/restore/pod_action.go @@ -31,10 +31,12 @@ type podAction struct { logger logrus.FieldLogger } -func NewPodAction(logger logrus.FieldLogger) ItemAction { - return &podAction{ - logger: logger, - } +func NewPodAction() ItemAction { + return &podAction{} +} + +func (a *podAction) SetLog(log logrus.FieldLogger) { + a.logger = log } func (a *podAction) AppliesTo() (ResourceSelector, error) { diff --git a/pkg/restore/restore.go b/pkg/restore/restore.go index 95d4cbc3e6..78be9c2a49 100644 --- a/pkg/restore/restore.go +++ b/pkg/restore/restore.go @@ -64,9 +64,9 @@ type kindString string // kubernetesRestorer implements Restorer for restoring into a Kubernetes cluster. type kubernetesRestorer struct { - discoveryHelper discovery.Helper - dynamicFactory client.DynamicFactory - backupService cloudprovider.BackupService + discoveryHelper discovery.Helper + dynamicFactory client.DynamicFactory + // backupService cloudprovider.BackupService snapshotService cloudprovider.SnapshotService backupClient arkv1client.BackupsGetter namespaceClient corev1.NamespaceInterface @@ -137,7 +137,7 @@ func prioritizeResources(helper discovery.Helper, priorities []string, includedR func NewKubernetesRestorer( discoveryHelper discovery.Helper, dynamicFactory client.DynamicFactory, - backupService cloudprovider.BackupService, + // backupService cloudprovider.BackupService, snapshotService cloudprovider.SnapshotService, resourcePriorities []string, backupClient arkv1client.BackupsGetter, @@ -147,7 +147,6 @@ func NewKubernetesRestorer( return &kubernetesRestorer{ discoveryHelper: discoveryHelper, dynamicFactory: dynamicFactory, - backupService: backupService, snapshotService: snapshotService, backupClient: backupClient, namespaceClient: namespaceClient, diff --git a/pkg/restore/service_action.go b/pkg/restore/service_action.go index 3f48b91de9..8fd8d0e513 100644 --- a/pkg/restore/service_action.go +++ b/pkg/restore/service_action.go @@ -29,10 +29,12 @@ type serviceAction struct { log logrus.FieldLogger } -func NewServiceAction(log logrus.FieldLogger) ItemAction { - return &serviceAction{ - log: log, - } +func NewServiceAction() ItemAction { + return &serviceAction{} +} + +func (a *serviceAction) SetLog(log logrus.FieldLogger) { + a.log = log } func (a *serviceAction) AppliesTo() (ResourceSelector, error) {