From 07e24870e27d070d1499f36c1ae9e4f58b348430 Mon Sep 17 00:00:00 2001 From: Michael Weibel Date: Wed, 18 Nov 2020 14:55:56 +0100 Subject: [PATCH 1/2] fix: only add ResticPassword if set --- api/v1alpha1/backend.go | 6 +++-- executor/generic.go | 1 - executor/generic_test.go | 52 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 56 insertions(+), 3 deletions(-) create mode 100644 executor/generic_test.go diff --git a/api/v1alpha1/backend.go b/api/v1alpha1/backend.go index ecf5c6cd9..d4dc01b1f 100644 --- a/api/v1alpha1/backend.go +++ b/api/v1alpha1/backend.go @@ -23,8 +23,10 @@ type Backend struct { func (b *Backend) GetCredentialEnv() map[string]*corev1.EnvVarSource { vars := make(map[string]*corev1.EnvVarSource) - vars[constants.ResticPasswordEnvName] = &corev1.EnvVarSource{ - SecretKeyRef: b.RepoPasswordSecretRef, + if b.RepoPasswordSecretRef != nil { + vars[constants.ResticPasswordEnvName] = &corev1.EnvVarSource{ + SecretKeyRef: b.RepoPasswordSecretRef, + } } if b.Azure != nil { diff --git a/executor/generic.go b/executor/generic.go index cf866cf84..41e9af5b6 100644 --- a/executor/generic.go +++ b/executor/generic.go @@ -130,7 +130,6 @@ func DefaultEnv(namespace string) EnvVarConverter { defaults := NewEnvVarConverter() defaults.SetString("STATS_URL", constants.GetGlobalStatsURL()) - defaults.SetString(constants.ResticPasswordEnvName, constants.GetGlobalRepoPassword()) defaults.SetString(constants.ResticRepositoryEnvName, fmt.Sprintf("s3:%s/%s", constants.GetGlobalS3Endpoint(), constants.GetGlobalS3Bucket())) defaults.SetString(constants.ResticPasswordEnvName, constants.GetGlobalRepoPassword()) defaults.SetString(constants.AwsAccessKeyIDEnvName, constants.GetGlobalAccessKey()) diff --git a/executor/generic_test.go b/executor/generic_test.go new file mode 100644 index 000000000..4d3a978cc --- /dev/null +++ b/executor/generic_test.go @@ -0,0 +1,52 @@ +package executor + +import ( + "testing" + + corev1 "k8s.io/api/core/v1" +) + +func TestEnvVarConverter_Merge(t *testing.T) { + vars := NewEnvVarConverter() + vars.SetString("nooverridestr", "original") + vars.SetEnvVarSource("nooverridesrc", &corev1.EnvVarSource{SecretKeyRef: &corev1.SecretKeySelector{Key: "original"}}) + vars.SetString("nomergestr", "original") + vars.SetEnvVarSource("nomergesource", &corev1.EnvVarSource{SecretKeyRef: &corev1.SecretKeySelector{Key: "original"}}) + + src := NewEnvVarConverter() + src.SetString("nooverridestr", "updated") + src.SetEnvVarSource("nooverridestr", &corev1.EnvVarSource{SecretKeyRef: &corev1.SecretKeySelector{Key: "updated"}}) + src.SetEnvVarSource("nooverridesrc", &corev1.EnvVarSource{SecretKeyRef: &corev1.SecretKeySelector{Key: "updated"}}) + src.SetString("newstr", "original") + src.SetEnvVarSource("newsource", &corev1.EnvVarSource{SecretKeyRef: &corev1.SecretKeySelector{Key: "original"}}) + + if err := vars.Merge(src); err != nil { + t.Errorf("unable to merge: %v", err) + } + + v := vars.Vars + + if *v["nooverridestr"].stringEnv != "original" { + t.Error("nooverridestr should not have been updated.") + } + if v["nooverridestr"].envVarSource != nil { + t.Error("nooverridestr should not have been updated.") + } + if v["nooverridesrc"].envVarSource.SecretKeyRef.Key != "original" { + t.Error("nooverridesrc should not have been updated.") + } + + if *v["nomergestr"].stringEnv != "original" { + t.Error("nomergestr should not have been updated.") + } + if v["nomergesource"].envVarSource.SecretKeyRef.Key != "original" { + t.Error("nomergesource should not have been updated.") + } + + if *v["newstr"].stringEnv != "original" { + t.Error("newstr should have been merged in.") + } + if v["newsource"].envVarSource.SecretKeyRef.Key != "original" { + t.Error("nomergesource should have been merged in.") + } +} From 07a6376bb56426181b75ccb7d1df352942f355f0 Mon Sep 17 00:00:00 2001 From: Michael Weibel Date: Thu, 19 Nov 2020 16:50:54 +0100 Subject: [PATCH 2/2] feat: deduplicate --- api/v1alpha1/archive_types.go | 4 +-- api/v1alpha1/backup_types.go | 4 +-- api/v1alpha1/check_types.go | 4 +-- api/v1alpha1/job_types.go | 13 +++++++ api/v1alpha1/prune_types.go | 4 +-- api/v1alpha1/restore_types.go | 4 +-- api/v1alpha1/schedule_types.go | 4 +-- constants/constants.go | 5 +++ controllers/archive_controller.go | 7 +++- controllers/backup_controller.go | 7 +++- controllers/check_controller.go | 8 ++++- controllers/job_controller.go | 2 +- controllers/prune_controller.go | 7 +++- controllers/restore_controller.go | 7 +++- controllers/schedule_controller.go | 7 +++- executor/restore_test.go | 2 +- executor/worker.go | 25 ++++++++------ handler/generic.go | 4 ++- handler/job.go | 7 ++-- job/job.go | 15 ++++---- queue/execution_test.go | 55 ++++++++++++++++++++++++++++++ 21 files changed, 153 insertions(+), 42 deletions(-) create mode 100644 api/v1alpha1/job_types.go create mode 100644 queue/execution_test.go diff --git a/api/v1alpha1/archive_types.go b/api/v1alpha1/archive_types.go index 36cc93a23..a5640cb32 100644 --- a/api/v1alpha1/archive_types.go +++ b/api/v1alpha1/archive_types.go @@ -74,8 +74,8 @@ func (a *Archive) GetMetaObject() metav1.Object { return a } -func (*Archive) GetType() string { - return "archive" +func (*Archive) GetType() Type { + return ArchiveType } func (a *Archive) GetK8upStatus() *K8upStatus { diff --git a/api/v1alpha1/backup_types.go b/api/v1alpha1/backup_types.go index 22f8cf8c1..30e0bff8b 100644 --- a/api/v1alpha1/backup_types.go +++ b/api/v1alpha1/backup_types.go @@ -97,8 +97,8 @@ func (b *Backup) GetMetaObject() metav1.Object { return b } -func (*Backup) GetType() string { - return "backup" +func (*Backup) GetType() Type { + return BackupType } func (b *Backup) GetK8upStatus() *K8upStatus { diff --git a/api/v1alpha1/check_types.go b/api/v1alpha1/check_types.go index b2f310a00..acad100a0 100644 --- a/api/v1alpha1/check_types.go +++ b/api/v1alpha1/check_types.go @@ -67,8 +67,8 @@ func (c *Check) GetMetaObject() metav1.Object { return c } -func (c *Check) GetType() string { - return "check" +func (c *Check) GetType() Type { + return CheckType } func (c *Check) GetK8upStatus() *K8upStatus { diff --git a/api/v1alpha1/job_types.go b/api/v1alpha1/job_types.go new file mode 100644 index 000000000..767c5f90e --- /dev/null +++ b/api/v1alpha1/job_types.go @@ -0,0 +1,13 @@ +package v1alpha1 + +// Type defines what job type this is. +type Type string + +const ( + BackupType Type = "backup" + CheckType Type = "check" + ArchiveType Type = "archive" + RestoreType Type = "restore" + PruneType Type = "prune" + ScheduleType Type = "schedule" +) diff --git a/api/v1alpha1/prune_types.go b/api/v1alpha1/prune_types.go index d55ede3af..37aa80bad 100644 --- a/api/v1alpha1/prune_types.go +++ b/api/v1alpha1/prune_types.go @@ -80,8 +80,8 @@ func (p *Prune) GetMetaObject() metav1.Object { return p } -func (p *Prune) GetType() string { - return "prune" +func (p *Prune) GetType() Type { + return PruneType } func (p *Prune) GetK8upStatus() *K8upStatus { diff --git a/api/v1alpha1/restore_types.go b/api/v1alpha1/restore_types.go index 187a4c5e3..8d786db1b 100644 --- a/api/v1alpha1/restore_types.go +++ b/api/v1alpha1/restore_types.go @@ -81,8 +81,8 @@ func (r *Restore) GetMetaObject() metav1.Object { return r } -func (r *Restore) GetType() string { - return "restore" +func (r *Restore) GetType() Type { + return RestoreType } func (r *Restore) GetK8upStatus() *K8upStatus { diff --git a/api/v1alpha1/schedule_types.go b/api/v1alpha1/schedule_types.go index 57d5396d9..9bd539e2e 100644 --- a/api/v1alpha1/schedule_types.go +++ b/api/v1alpha1/schedule_types.go @@ -116,8 +116,8 @@ func (s *Schedule) GetMetaObject() metav1.Object { return s } -func (*Schedule) GetType() string { - return "schedule" +func (*Schedule) GetType() Type { + return ScheduleType } func (s *Schedule) GetK8upStatus() *K8upStatus { diff --git a/constants/constants.go b/constants/constants.go index 354c290a7..113f86fe2 100644 --- a/constants/constants.go +++ b/constants/constants.go @@ -5,6 +5,7 @@ package constants import ( + "fmt" "os" "strconv" ) @@ -82,6 +83,10 @@ func GetGlobalS3Bucket() string { return globalS3Bucket } +func GetGlobalRepository() string { + return fmt.Sprintf("s3:%s/%s", GetGlobalS3Endpoint(), GetGlobalS3Bucket()) +} + func GetGlobalRestoreS3SecretAccessKey() string { return globalRestoreS3SecretAccessKey } diff --git a/controllers/archive_controller.go b/controllers/archive_controller.go index a0be3682b..8ebe7fa91 100644 --- a/controllers/archive_controller.go +++ b/controllers/archive_controller.go @@ -27,6 +27,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" k8upv1alpha1 "github.com/vshn/k8up/api/v1alpha1" + "github.com/vshn/k8up/constants" "github.com/vshn/k8up/handler" "github.com/vshn/k8up/job" ) @@ -55,7 +56,11 @@ func (r *ArchiveReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { return ctrl.Result{}, err } - config := job.NewConfig(ctx, r.Client, log, archive, r.Scheme) + repository := constants.GetGlobalRepository() + if archive.Spec.Backend != nil { + repository = archive.Spec.Backend.String() + } + config := job.NewConfig(ctx, r.Client, log, archive, r.Scheme, repository) archiveHandler := handler.NewHandler(config) return ctrl.Result{RequeueAfter: time.Second * 30}, archiveHandler.Handle() diff --git a/controllers/backup_controller.go b/controllers/backup_controller.go index f8d3f6363..b4410e270 100644 --- a/controllers/backup_controller.go +++ b/controllers/backup_controller.go @@ -27,6 +27,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" k8upv1alpha1 "github.com/vshn/k8up/api/v1alpha1" + "github.com/vshn/k8up/constants" "github.com/vshn/k8up/handler" "github.com/vshn/k8up/job" ) @@ -57,7 +58,11 @@ func (r *BackupReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { return ctrl.Result{}, err } - config := job.NewConfig(ctx, r.Client, log, backup, r.Scheme) + repository := constants.GetGlobalRepository() + if backup.Spec.Backend != nil { + repository = backup.Spec.Backend.String() + } + config := job.NewConfig(ctx, r.Client, log, backup, r.Scheme, repository) backupHandler := handler.NewHandler(config) diff --git a/controllers/check_controller.go b/controllers/check_controller.go index 1cc412413..7cf40f875 100644 --- a/controllers/check_controller.go +++ b/controllers/check_controller.go @@ -27,6 +27,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" k8upv1alpha1 "github.com/vshn/k8up/api/v1alpha1" + "github.com/vshn/k8up/constants" "github.com/vshn/k8up/handler" "github.com/vshn/k8up/job" ) @@ -55,7 +56,12 @@ func (r *CheckReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { return ctrl.Result{}, err } - config := job.NewConfig(ctx, r.Client, logger, check, r.Scheme) + repository := constants.GetGlobalRepository() + if check.Spec.Backend != nil { + repository = check.Spec.Backend.String() + } + + config := job.NewConfig(ctx, r.Client, logger, check, r.Scheme, repository) checkHandler := handler.NewHandler(config) diff --git a/controllers/job_controller.go b/controllers/job_controller.go index 90570be17..532343da7 100644 --- a/controllers/job_controller.go +++ b/controllers/job_controller.go @@ -56,7 +56,7 @@ func (r *JobReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { return reconcile.Result{}, err } - config := job.NewConfig(ctx, r.Client, log, nil, r.Scheme) + config := job.NewConfig(ctx, r.Client, log, nil, r.Scheme, "") return ctrl.Result{}, handler.NewJobHandler(config, jobObj).Handle() } diff --git a/controllers/prune_controller.go b/controllers/prune_controller.go index 32284b560..aa5ae1962 100644 --- a/controllers/prune_controller.go +++ b/controllers/prune_controller.go @@ -27,6 +27,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" k8upv1alpha1 "github.com/vshn/k8up/api/v1alpha1" + "github.com/vshn/k8up/constants" "github.com/vshn/k8up/handler" "github.com/vshn/k8up/job" ) @@ -58,7 +59,11 @@ func (r *PruneReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { return ctrl.Result{}, nil } - config := job.NewConfig(ctx, r.Client, log, prune, r.Scheme) + repository := constants.GetGlobalRepository() + if prune.Spec.Backend != nil { + repository = prune.Spec.Backend.String() + } + config := job.NewConfig(ctx, r.Client, log, prune, r.Scheme, repository) pruneHandler := handler.NewHandler(config) diff --git a/controllers/restore_controller.go b/controllers/restore_controller.go index a24b11820..4c2c49d22 100644 --- a/controllers/restore_controller.go +++ b/controllers/restore_controller.go @@ -27,6 +27,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" k8upv1alpha1 "github.com/vshn/k8up/api/v1alpha1" + "github.com/vshn/k8up/constants" "github.com/vshn/k8up/handler" "github.com/vshn/k8up/job" ) @@ -58,7 +59,11 @@ func (r *RestoreReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { return ctrl.Result{}, nil } - config := job.NewConfig(ctx, r.Client, log, restore, r.Scheme) + repository := constants.GetGlobalRepository() + if restore.Spec.Backend != nil { + repository = restore.Spec.Backend.String() + } + config := job.NewConfig(ctx, r.Client, log, restore, r.Scheme, repository) restoreHandler := handler.NewHandler(config) diff --git a/controllers/schedule_controller.go b/controllers/schedule_controller.go index 3bd0114f9..c41a8d6fc 100644 --- a/controllers/schedule_controller.go +++ b/controllers/schedule_controller.go @@ -27,6 +27,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" k8upv1alpha1 "github.com/vshn/k8up/api/v1alpha1" + "github.com/vshn/k8up/constants" "github.com/vshn/k8up/handler" "github.com/vshn/k8up/job" ) @@ -55,7 +56,11 @@ func (r *ScheduleReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { return reconcile.Result{}, err } - config := job.NewConfig(ctx, r.Client, log, schedule, r.Scheme) + repository := constants.GetGlobalRepository() + if schedule.Spec.Backend != nil { + repository = schedule.Spec.Backend.String() + } + config := job.NewConfig(ctx, r.Client, log, schedule, r.Scheme, repository) return ctrl.Result{}, handler.NewScheduleHandler(config, schedule).Handle() } diff --git a/executor/restore_test.go b/executor/restore_test.go index 5204f42f9..53472d58a 100644 --- a/executor/restore_test.go +++ b/executor/restore_test.go @@ -296,6 +296,6 @@ func jobMatcher(restoreType string, additionalArgs []string, env Elements, volum } func newConfig() *job.Config { - cfg := job.NewConfig(context.TODO(), nil, nil, &k8upv1alpha1.Restore{}, scheme) + cfg := job.NewConfig(context.TODO(), nil, nil, &k8upv1alpha1.Restore{}, scheme, "") return &cfg } diff --git a/executor/worker.go b/executor/worker.go index a7bd36345..6dafc8163 100644 --- a/executor/worker.go +++ b/executor/worker.go @@ -56,19 +56,22 @@ func (qe *QueueWorker) loopRepositoryJobs(repository string) { shouldRun = !observer.GetObserver().IsExclusiveJobRunning(repository) } - if shouldRun { - err := job.Execute() - if err != nil { - if !errors.IsAlreadyExists(err) { - job.Logger().Error(err, "cannot create job", "repository", repository) - } - } + if !shouldRun { + job.Logger().Info("skipping job due to exclusivity", "exclusive", job.Exclusive(), "repository", job.GetRepository()) + continue + } - // Skip the rest for this repository if we just started an exclusive - // job. - if job.Exclusive() { - return + err := job.Execute() + if err != nil { + if !errors.IsAlreadyExists(err) { + job.Logger().Error(err, "cannot create job", "repository", repository) } } + + // Skip the rest for this repository if we just started an exclusive + // job. + if job.Exclusive() { + return + } } } diff --git a/handler/generic.go b/handler/generic.go index d39a2007e..c51b470a5 100644 --- a/handler/generic.go +++ b/handler/generic.go @@ -30,7 +30,9 @@ func (h *Handler) Handle() error { func (h *Handler) queueJob() error { h.Log.Info("adding job to the queue") - queue.GetExecQueue().Add(executor.NewExecutor(h.Config)) + e := executor.NewExecutor(h.Config) + + queue.GetExecQueue().Add(e) return nil } diff --git a/handler/job.go b/handler/job.go index 4816d572f..641d0bad7 100644 --- a/handler/job.go +++ b/handler/job.go @@ -76,9 +76,10 @@ func (j *JobHandler) Handle() error { } oj := observer.ObservableJob{ - Job: j.job, - Exclusive: exclusive, - Event: jobEvent, + Job: j.job, + Exclusive: exclusive, + Event: jobEvent, + Repository: j.Repository, } observer.GetObserver().GetUpdateChannel() <- oj diff --git a/job/job.go b/job/job.go index 09401752d..51cde6731 100644 --- a/job/job.go +++ b/job/job.go @@ -41,17 +41,18 @@ type Object interface { GetMetaObject() metav1.Object GetRuntimeObject() runtime.Object GetK8upStatus() *k8upv1alpha1.K8upStatus - GetType() string + GetType() k8upv1alpha1.Type } // NewConfig returns a new configuration. -func NewConfig(ctx context.Context, client client.Client, log logr.Logger, obj Object, scheme *runtime.Scheme) Config { +func NewConfig(ctx context.Context, client client.Client, log logr.Logger, obj Object, scheme *runtime.Scheme, repository string) Config { return Config{ - Client: client, - Log: log, - CTX: ctx, - Obj: obj, - Scheme: scheme, + Client: client, + Log: log, + CTX: ctx, + Obj: obj, + Scheme: scheme, + Repository: repository, } } diff --git a/queue/execution_test.go b/queue/execution_test.go new file mode 100644 index 000000000..0d938bd30 --- /dev/null +++ b/queue/execution_test.go @@ -0,0 +1,55 @@ +package queue + +import ( + "errors" + "testing" + + "github.com/go-logr/logr" +) + +type mockExecutor struct { + exclusive bool + repository string +} + +func (m *mockExecutor) Execute() error { + return errors.New("not implemented") +} +func (m *mockExecutor) Exclusive() bool { + return m.exclusive +} +func (m *mockExecutor) Logger() logr.Logger { + return nil +} +func (m *mockExecutor) GetRepository() string { + return m.repository +} + +func TestExecutionQueue(t *testing.T) { + q := newExecutionQueue() + + if !q.IsEmpty("repo1") || !q.IsEmpty("repo2") || !q.IsEmpty("") { + t.Fatal("queue is supposed to be empty") + } + + m1 := &mockExecutor{false, "repo1"} + q.Add(m1) + m2 := &mockExecutor{true, "repo1"} + q.Add(m2) + m3 := &mockExecutor{true, "repo2"} + q.Add(m3) + + a1 := q.Get("repo1") + a2 := q.Get("repo1") + a3 := q.Get("repo2") + + if a1 != m2 { + t.Error("expected to retrieve exclusive executor first") + } + if a2 != m1 { + t.Error("expected to retrieve non-exclusive executor second") + } + if a3 != m3 { + t.Error("expected to retrieve repo2") + } +}