Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deduplicate prune #148

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions api/v1alpha1/archive_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ func (a *Archive) GetMetaObject() metav1.Object {
return a
}

func (*Archive) GetType() string {
return "archive"
func (*Archive) GetType() Type {
return ArchiveType
}

func (a *Archive) GetK8upStatus() *K8upStatus {
Expand Down
6 changes: 4 additions & 2 deletions api/v1alpha1/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ type Backend struct {
func (b *Backend) GetCredentialEnv() map[string]*corev1.EnvVarSource {
vars := make(map[string]*corev1.EnvVarSource)

vars[constants.ResticPasswordEnvName] = &corev1.EnvVarSource{
SecretKeyRef: b.RepoPasswordSecretRef,
if b.RepoPasswordSecretRef != nil {
vars[constants.ResticPasswordEnvName] = &corev1.EnvVarSource{
SecretKeyRef: b.RepoPasswordSecretRef,
}
}

if b.Azure != nil {
Expand Down
4 changes: 2 additions & 2 deletions api/v1alpha1/backup_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ func (b *Backup) GetMetaObject() metav1.Object {
return b
}

func (*Backup) GetType() string {
return "backup"
func (*Backup) GetType() Type {
return BackupType
}

func (b *Backup) GetK8upStatus() *K8upStatus {
Expand Down
4 changes: 2 additions & 2 deletions api/v1alpha1/check_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ func (c *Check) GetMetaObject() metav1.Object {
return c
}

func (c *Check) GetType() string {
return "check"
func (c *Check) GetType() Type {
return CheckType
}

func (c *Check) GetK8upStatus() *K8upStatus {
Expand Down
13 changes: 13 additions & 0 deletions api/v1alpha1/job_types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package v1alpha1

// Type defines what job type this is.
type Type string
Comment on lines +3 to +4
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is highly confusing with the built-in Go's "type"

Suggested change
// Type defines what job type this is.
type Type string
// JobType defines what kind a certain job is.
type JobType string


const (
BackupType Type = "backup"
CheckType Type = "check"
ArchiveType Type = "archive"
RestoreType Type = "restore"
PruneType Type = "prune"
ScheduleType Type = "schedule"
)
4 changes: 2 additions & 2 deletions api/v1alpha1/prune_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ func (p *Prune) GetMetaObject() metav1.Object {
return p
}

func (p *Prune) GetType() string {
return "prune"
func (p *Prune) GetType() Type {
return PruneType
}

func (p *Prune) GetK8upStatus() *K8upStatus {
Expand Down
4 changes: 2 additions & 2 deletions api/v1alpha1/restore_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ func (r *Restore) GetMetaObject() metav1.Object {
return r
}

func (r *Restore) GetType() string {
return "restore"
func (r *Restore) GetType() Type {
return RestoreType
}

func (r *Restore) GetK8upStatus() *K8upStatus {
Expand Down
4 changes: 2 additions & 2 deletions api/v1alpha1/schedule_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ func (s *Schedule) GetMetaObject() metav1.Object {
return s
}

func (*Schedule) GetType() string {
return "schedule"
func (*Schedule) GetType() Type {
return ScheduleType
}

func (s *Schedule) GetK8upStatus() *K8upStatus {
Expand Down
5 changes: 5 additions & 0 deletions constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package constants

import (
"fmt"
"os"
"strconv"
)
Expand Down Expand Up @@ -82,6 +83,10 @@ func GetGlobalS3Bucket() string {
return globalS3Bucket
}

func GetGlobalRepository() string {
return fmt.Sprintf("s3:%s/%s", GetGlobalS3Endpoint(), GetGlobalS3Bucket())
}

func GetGlobalRestoreS3SecretAccessKey() string {
return globalRestoreS3SecretAccessKey
}
Expand Down
7 changes: 6 additions & 1 deletion controllers/archive_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

k8upv1alpha1 "github.com/vshn/k8up/api/v1alpha1"
"github.com/vshn/k8up/constants"
"github.com/vshn/k8up/handler"
"github.com/vshn/k8up/job"
)
Expand Down Expand Up @@ -55,7 +56,11 @@ func (r *ArchiveReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
return ctrl.Result{}, err
}

config := job.NewConfig(ctx, r.Client, log, archive, r.Scheme)
repository := constants.GetGlobalRepository()
if archive.Spec.Backend != nil {
repository = archive.Spec.Backend.String()
}
config := job.NewConfig(ctx, r.Client, log, archive, r.Scheme, repository)

archiveHandler := handler.NewHandler(config)
return ctrl.Result{RequeueAfter: time.Second * 30}, archiveHandler.Handle()
Expand Down
7 changes: 6 additions & 1 deletion controllers/backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

k8upv1alpha1 "github.com/vshn/k8up/api/v1alpha1"
"github.com/vshn/k8up/constants"
"github.com/vshn/k8up/handler"
"github.com/vshn/k8up/job"
)
Expand Down Expand Up @@ -57,7 +58,11 @@ func (r *BackupReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
return ctrl.Result{}, err
}

config := job.NewConfig(ctx, r.Client, log, backup, r.Scheme)
repository := constants.GetGlobalRepository()
if backup.Spec.Backend != nil {
repository = backup.Spec.Backend.String()
}
config := job.NewConfig(ctx, r.Client, log, backup, r.Scheme, repository)

backupHandler := handler.NewHandler(config)

Expand Down
8 changes: 7 additions & 1 deletion controllers/check_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

k8upv1alpha1 "github.com/vshn/k8up/api/v1alpha1"
"github.com/vshn/k8up/constants"
"github.com/vshn/k8up/handler"
"github.com/vshn/k8up/job"
)
Expand Down Expand Up @@ -55,7 +56,12 @@ func (r *CheckReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
return ctrl.Result{}, err
}

config := job.NewConfig(ctx, r.Client, logger, check, r.Scheme)
repository := constants.GetGlobalRepository()
if check.Spec.Backend != nil {
repository = check.Spec.Backend.String()
}

config := job.NewConfig(ctx, r.Client, logger, check, r.Scheme, repository)

checkHandler := handler.NewHandler(config)

Expand Down
2 changes: 1 addition & 1 deletion controllers/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (r *JobReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
return reconcile.Result{}, err
}

config := job.NewConfig(ctx, r.Client, log, nil, r.Scheme)
config := job.NewConfig(ctx, r.Client, log, nil, r.Scheme, "")

return ctrl.Result{}, handler.NewJobHandler(config, jobObj).Handle()
}
Expand Down
7 changes: 6 additions & 1 deletion controllers/prune_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

k8upv1alpha1 "github.com/vshn/k8up/api/v1alpha1"
"github.com/vshn/k8up/constants"
"github.com/vshn/k8up/handler"
"github.com/vshn/k8up/job"
)
Expand Down Expand Up @@ -58,7 +59,11 @@ func (r *PruneReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
return ctrl.Result{}, nil
}

config := job.NewConfig(ctx, r.Client, log, prune, r.Scheme)
repository := constants.GetGlobalRepository()
if prune.Spec.Backend != nil {
repository = prune.Spec.Backend.String()
}
config := job.NewConfig(ctx, r.Client, log, prune, r.Scheme, repository)

pruneHandler := handler.NewHandler(config)

Expand Down
7 changes: 6 additions & 1 deletion controllers/restore_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

k8upv1alpha1 "github.com/vshn/k8up/api/v1alpha1"
"github.com/vshn/k8up/constants"
"github.com/vshn/k8up/handler"
"github.com/vshn/k8up/job"
)
Expand Down Expand Up @@ -58,7 +59,11 @@ func (r *RestoreReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
return ctrl.Result{}, nil
}

config := job.NewConfig(ctx, r.Client, log, restore, r.Scheme)
repository := constants.GetGlobalRepository()
if restore.Spec.Backend != nil {
repository = restore.Spec.Backend.String()
}
config := job.NewConfig(ctx, r.Client, log, restore, r.Scheme, repository)

restoreHandler := handler.NewHandler(config)

Expand Down
7 changes: 6 additions & 1 deletion controllers/schedule_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"

k8upv1alpha1 "github.com/vshn/k8up/api/v1alpha1"
"github.com/vshn/k8up/constants"
"github.com/vshn/k8up/handler"
"github.com/vshn/k8up/job"
)
Expand Down Expand Up @@ -55,7 +56,11 @@ func (r *ScheduleReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
return reconcile.Result{}, err
}

config := job.NewConfig(ctx, r.Client, log, schedule, r.Scheme)
repository := constants.GetGlobalRepository()
if schedule.Spec.Backend != nil {
repository = schedule.Spec.Backend.String()
}
config := job.NewConfig(ctx, r.Client, log, schedule, r.Scheme, repository)

return ctrl.Result{}, handler.NewScheduleHandler(config, schedule).Handle()
}
Expand Down
1 change: 0 additions & 1 deletion executor/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ func DefaultEnv(namespace string) EnvVarConverter {
defaults := NewEnvVarConverter()

defaults.SetString("STATS_URL", constants.GetGlobalStatsURL())
defaults.SetString(constants.ResticPasswordEnvName, constants.GetGlobalRepoPassword())
defaults.SetString(constants.ResticRepositoryEnvName, fmt.Sprintf("s3:%s/%s", constants.GetGlobalS3Endpoint(), constants.GetGlobalS3Bucket()))
defaults.SetString(constants.ResticPasswordEnvName, constants.GetGlobalRepoPassword())
defaults.SetString(constants.AwsAccessKeyIDEnvName, constants.GetGlobalAccessKey())
Expand Down
52 changes: 52 additions & 0 deletions executor/generic_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package executor

import (
"testing"

corev1 "k8s.io/api/core/v1"
)

func TestEnvVarConverter_Merge(t *testing.T) {
vars := NewEnvVarConverter()
vars.SetString("nooverridestr", "original")
vars.SetEnvVarSource("nooverridesrc", &corev1.EnvVarSource{SecretKeyRef: &corev1.SecretKeySelector{Key: "original"}})
vars.SetString("nomergestr", "original")
vars.SetEnvVarSource("nomergesource", &corev1.EnvVarSource{SecretKeyRef: &corev1.SecretKeySelector{Key: "original"}})

src := NewEnvVarConverter()
src.SetString("nooverridestr", "updated")
src.SetEnvVarSource("nooverridestr", &corev1.EnvVarSource{SecretKeyRef: &corev1.SecretKeySelector{Key: "updated"}})
src.SetEnvVarSource("nooverridesrc", &corev1.EnvVarSource{SecretKeyRef: &corev1.SecretKeySelector{Key: "updated"}})
src.SetString("newstr", "original")
src.SetEnvVarSource("newsource", &corev1.EnvVarSource{SecretKeyRef: &corev1.SecretKeySelector{Key: "original"}})

if err := vars.Merge(src); err != nil {
t.Errorf("unable to merge: %v", err)
}

v := vars.Vars

if *v["nooverridestr"].stringEnv != "original" {
t.Error("nooverridestr should not have been updated.")
}
if v["nooverridestr"].envVarSource != nil {
t.Error("nooverridestr should not have been updated.")
}
if v["nooverridesrc"].envVarSource.SecretKeyRef.Key != "original" {
t.Error("nooverridesrc should not have been updated.")
}

if *v["nomergestr"].stringEnv != "original" {
t.Error("nomergestr should not have been updated.")
}
if v["nomergesource"].envVarSource.SecretKeyRef.Key != "original" {
t.Error("nomergesource should not have been updated.")
}

if *v["newstr"].stringEnv != "original" {
t.Error("newstr should have been merged in.")
}
if v["newsource"].envVarSource.SecretKeyRef.Key != "original" {
t.Error("nomergesource should have been merged in.")
}
}
2 changes: 1 addition & 1 deletion executor/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,6 @@ func jobMatcher(restoreType string, additionalArgs []string, env Elements, volum
}

func newConfig() *job.Config {
cfg := job.NewConfig(context.TODO(), nil, nil, &k8upv1alpha1.Restore{}, scheme)
cfg := job.NewConfig(context.TODO(), nil, nil, &k8upv1alpha1.Restore{}, scheme, "")
return &cfg
}
25 changes: 14 additions & 11 deletions executor/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,22 @@ func (qe *QueueWorker) loopRepositoryJobs(repository string) {
shouldRun = !observer.GetObserver().IsExclusiveJobRunning(repository)
}

if shouldRun {
err := job.Execute()
if err != nil {
if !errors.IsAlreadyExists(err) {
job.Logger().Error(err, "cannot create job", "repository", repository)
}
}
if !shouldRun {
job.Logger().Info("skipping job due to exclusivity", "exclusive", job.Exclusive(), "repository", job.GetRepository())
continue
}

// Skip the rest for this repository if we just started an exclusive
// job.
if job.Exclusive() {
return
err := job.Execute()
if err != nil {
if !errors.IsAlreadyExists(err) {
job.Logger().Error(err, "cannot create job", "repository", repository)
}
}

// Skip the rest for this repository if we just started an exclusive
// job.
if job.Exclusive() {
return
}
}
}
4 changes: 3 additions & 1 deletion handler/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ func (h *Handler) Handle() error {
func (h *Handler) queueJob() error {
h.Log.Info("adding job to the queue")

queue.GetExecQueue().Add(executor.NewExecutor(h.Config))
e := executor.NewExecutor(h.Config)

queue.GetExecQueue().Add(e)

return nil
}
7 changes: 4 additions & 3 deletions handler/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,10 @@ func (j *JobHandler) Handle() error {
}

oj := observer.ObservableJob{
Job: j.job,
Exclusive: exclusive,
Event: jobEvent,
Job: j.job,
Exclusive: exclusive,
Event: jobEvent,
Repository: j.Repository,
}

observer.GetObserver().GetUpdateChannel() <- oj
Expand Down
Loading