Skip to content

Commit

Permalink
refactoring of job type from string to JobType type
Browse files Browse the repository at this point in the history
  • Loading branch information
susana-garcia committed Dec 1, 2020
1 parent ccc2f58 commit 6393abc
Show file tree
Hide file tree
Showing 18 changed files with 87 additions and 54 deletions.
5 changes: 3 additions & 2 deletions api/v1alpha1/archive_types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package v1alpha1

import (
"github.com/vshn/k8up/cfg"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
)
Expand Down Expand Up @@ -58,8 +59,8 @@ func (a *Archive) GetMetaObject() metav1.Object {
return a
}

func (*Archive) GetType() string {
return "archive"
func (*Archive) GetType() cfg.JobType {
return cfg.Archive
}

func (a *Archive) GetK8upStatus() *K8upStatus {
Expand Down
5 changes: 3 additions & 2 deletions api/v1alpha1/backup_types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package v1alpha1

import (
"github.com/vshn/k8up/cfg"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
)
Expand Down Expand Up @@ -81,8 +82,8 @@ func (b *Backup) GetMetaObject() metav1.Object {
return b
}

func (*Backup) GetType() string {
return "backup"
func (*Backup) GetType() cfg.JobType {
return cfg.Backup
}

func (b *Backup) GetK8upStatus() *K8upStatus {
Expand Down
5 changes: 3 additions & 2 deletions api/v1alpha1/check_types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package v1alpha1

import (
"github.com/vshn/k8up/cfg"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
)
Expand Down Expand Up @@ -51,8 +52,8 @@ func (c *Check) GetMetaObject() metav1.Object {
return c
}

func (c *Check) GetType() string {
return "check"
func (c *Check) GetType() cfg.JobType {
return cfg.Check
}

func (c *Check) GetK8upStatus() *K8upStatus {
Expand Down
5 changes: 3 additions & 2 deletions api/v1alpha1/prune_types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package v1alpha1

import (
"github.com/vshn/k8up/cfg"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
)
Expand Down Expand Up @@ -64,8 +65,8 @@ func (p *Prune) GetMetaObject() metav1.Object {
return p
}

func (p *Prune) GetType() string {
return "prune"
func (p *Prune) GetType() cfg.JobType {
return cfg.Prune
}

func (p *Prune) GetK8upStatus() *K8upStatus {
Expand Down
5 changes: 3 additions & 2 deletions api/v1alpha1/restore_types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package v1alpha1

import (
"github.com/vshn/k8up/cfg"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -65,8 +66,8 @@ func (r *Restore) GetMetaObject() metav1.Object {
return r
}

func (r *Restore) GetType() string {
return "restore"
func (r *Restore) GetType() cfg.JobType {
return cfg.Restore
}

func (r *Restore) GetK8upStatus() *K8upStatus {
Expand Down
5 changes: 3 additions & 2 deletions api/v1alpha1/schedule_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package v1alpha1
import (
"fmt"

"github.com/vshn/k8up/cfg"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
)
Expand Down Expand Up @@ -100,8 +101,8 @@ func (s *Schedule) GetMetaObject() metav1.Object {
return s
}

func (*Schedule) GetType() string {
return "schedule"
func (*Schedule) GetType() cfg.JobType {
return cfg.Schedule
}

func (s *Schedule) GetK8upStatus() *K8upStatus {
Expand Down
11 changes: 11 additions & 0 deletions cfg/config.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package cfg

// JobType is the type the jobs can have
type JobType string

const (
RestoreS3EndpointEnvName = "RESTORE_S3ENDPOINT"
RestoreS3AccessKeyIDEnvName = "RESTORE_ACCESSKEYID"
Expand All @@ -8,6 +11,14 @@ const (
ResticPasswordEnvName = "RESTIC_PASSWORD"
AwsAccessKeyIDEnvName = "AWS_ACCESS_KEY_ID"
AwsSecretAccessKeyEnvName = "AWS_SECRET_ACCESS_KEY"

Archive JobType = "archive"
Backup JobType = "backup"
Check JobType = "check"
Prune JobType = "prune"
Restore JobType = "restore"
Schedule JobType = "schedule"
Job JobType = ""
)

// Configuration holds a strongly-typed tree of the configuration
Expand Down
6 changes: 6 additions & 0 deletions executor/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package executor

import (
stderrors "errors"

"github.com/vshn/k8up/cfg"

k8upv1alpha1 "github.com/vshn/k8up/api/v1alpha1"
Expand All @@ -28,6 +29,11 @@ func NewArchiveExecutor(config job.Config) *ArchiveExecutor {
}
}

// GetConcurrencyLimit returns the concurrent jobs limit
func (a *ArchiveExecutor) GetConcurrencyLimit() int {
return cfg.Config.GlobalConcurrentArchiveJobsLimit
}

// Execute creates the actual batch.job on the k8s api.
func (a *ArchiveExecutor) Execute() error {
archive, ok := a.Obj.(*k8upv1alpha1.Archive)
Expand Down
8 changes: 7 additions & 1 deletion executor/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package executor

import (
stderrors "errors"
"github.com/vshn/k8up/cfg"
"path"
"strconv"

"github.com/vshn/k8up/cfg"

k8upv1alpha1 "github.com/vshn/k8up/api/v1alpha1"
"github.com/vshn/k8up/job"
"github.com/vshn/k8up/observer"
Expand Down Expand Up @@ -39,6 +40,11 @@ func NewBackupExecutor(config job.Config) *BackupExecutor {
}
}

// GetConcurrencyLimit returns the concurrent jobs limit
func (b *BackupExecutor) GetConcurrencyLimit() int {
return cfg.Config.GlobalConcurrentBackupJobsLimit
}

// Execute triggers the actual batch.job creation on the cluster. It will also register
// a callback function on the observer so the prebackup pods can be removed after the backup
// has finished.
Expand Down
6 changes: 6 additions & 0 deletions executor/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package executor

import (
stderrors "errors"

"github.com/vshn/k8up/cfg"

k8upv1alpha1 "github.com/vshn/k8up/api/v1alpha1"
Expand All @@ -22,6 +23,11 @@ func NewCheckExecutor(config job.Config) *CheckExecutor {
}
}

// GetConcurrencyLimit returns the concurrent jobs limit
func (c *CheckExecutor) GetConcurrencyLimit() int {
return cfg.Config.GlobalConcurrentCheckJobsLimit
}

// Execute creates the actual batch.job on the k8s api.
func (c *CheckExecutor) Execute() error {
checkObject, ok := c.Obj.(*k8upv1alpha1.Check)
Expand Down
15 changes: 8 additions & 7 deletions executor/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ package executor

import (
"fmt"
"github.com/vshn/k8up/cfg"
"sort"

"github.com/vshn/k8up/cfg"

"github.com/go-logr/logr"
"github.com/imdario/mergo"
"github.com/vshn/k8up/job"
Expand Down Expand Up @@ -112,22 +113,22 @@ func (g *generic) GetJobNamespace() string {
return g.Obj.GetMetaObject().GetNamespace()
}

func (g *generic) GetJobType() string {
func (g *generic) GetJobType() cfg.JobType {
return g.Obj.GetType()
}

// NewExecutor will return the right Executor for the given job object.
func NewExecutor(config job.Config) queue.Executor {
switch config.Obj.GetType() {
case "backup":
case cfg.Backup:
return NewBackupExecutor(config)
case "check":
case cfg.Check:
return NewCheckExecutor(config)
case "archive":
case cfg.Archive:
return NewArchiveExecutor(config)
case "prune":
case cfg.Prune:
return NewPruneExecutor(config)
case "restore":
case cfg.Restore:
return NewRestoreExecutor(config)
}
return nil
Expand Down
8 changes: 7 additions & 1 deletion executor/prune.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package executor

import (
"errors"
"github.com/vshn/k8up/cfg"
"strconv"
"strings"

"github.com/vshn/k8up/cfg"

k8upv1alpha1 "github.com/vshn/k8up/api/v1alpha1"
"github.com/vshn/k8up/job"
"github.com/vshn/k8up/observer"
Expand All @@ -28,6 +29,11 @@ func NewPruneExecutor(config job.Config) *PruneExecutor {
}
}

// GetConcurrencyLimit returns the concurrent jobs limit
func (p *PruneExecutor) GetConcurrencyLimit() int {
return cfg.Config.GlobalConcurrentPruneJobsLimit
}

// Execute creates the actual batch.job on the k8s api.
func (p *PruneExecutor) Execute() error {
prune, ok := p.Obj.(*k8upv1alpha1.Prune)
Expand Down
11 changes: 10 additions & 1 deletion executor/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package executor

import (
"errors"
"github.com/vshn/k8up/cfg"
"strconv"

"github.com/vshn/k8up/cfg"

batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -22,12 +23,19 @@ type RestoreExecutor struct {
generic
}

// NewRestoreExecutor will return a new executor for Restore jobs.
func NewRestoreExecutor(config job.Config) *RestoreExecutor {
return &RestoreExecutor{
generic: generic{config},
}
}

// GetConcurrencyLimit returns the concurrent jobs limit
func (r *RestoreExecutor) GetConcurrencyLimit() int {
return cfg.Config.GlobalConcurrentRestoreJobsLimit
}

// Execute creates the actual batch.job on the k8s api.
func (r *RestoreExecutor) Execute() error {
restore, ok := r.Obj.(*k8upv1alpha1.Restore)
if !ok {
Expand All @@ -43,6 +51,7 @@ func (r *RestoreExecutor) Execute() error {
return nil
}

// Exclusive should return true for jobs that can't run while other jobs run.
func (r *RestoreExecutor) Exclusive() bool {
return true
}
Expand Down
3 changes: 2 additions & 1 deletion executor/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func (qe *QueueWorker) loopRepositoryJobs(repository string) {
for !queue.GetExecQueue().IsEmpty(repository) {
job := queue.GetExecQueue().Get(repository)
jobType := job.GetJobType()
jobLimit := job.GetConcurrencyLimit()

var shouldRun bool
if job.Exclusive() {
Expand All @@ -55,7 +56,7 @@ func (qe *QueueWorker) loopRepositoryJobs(repository string) {
shouldRun = !observer.GetObserver().IsAnyJobRunning(repository)
} else {
shouldRun = !observer.GetObserver().IsExclusiveJobRunning(repository) &&
!observer.GetObserver().IsLimitConcurrentJobsReached(jobType, repository)
!observer.GetObserver().IsLimitConcurrentJobsReached(jobType, jobLimit, repository)
}

if shouldRun {
Expand Down
3 changes: 2 additions & 1 deletion handler/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"strconv"

"github.com/vshn/k8up/cfg"
"github.com/vshn/k8up/job"
"github.com/vshn/k8up/observer"
batchv1 "k8s.io/api/batch/v1"
Expand Down Expand Up @@ -75,7 +76,7 @@ func (j *JobHandler) Handle() error {
exclusive = false
}

jobType := ""
jobType := cfg.Job
if j.Config.Obj != nil {
jobType = j.Config.Obj.GetType()
}
Expand Down
3 changes: 2 additions & 1 deletion job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package job

import (
"context"

"github.com/vshn/k8up/cfg"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -41,7 +42,7 @@ type Object interface {
GetMetaObject() metav1.Object
GetRuntimeObject() runtime.Object
GetK8upStatus() *k8upv1alpha1.K8upStatus
GetType() string
GetType() cfg.JobType
}

// NewConfig returns a new configuration.
Expand Down
Loading

0 comments on commit 6393abc

Please sign in to comment.