Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor backup cronjob #255

Merged
merged 3 commits into from
Mar 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/controller/internal/testutil/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func BackupForCluster(cluster *api.MysqlCluster) gomegatypes.GomegaMatcher {
func BackupWithName(name string) gomegatypes.GomegaMatcher {
return MatchFields(IgnoreExtras, Fields{
"ObjectMeta": MatchFields(IgnoreExtras, Fields{
"Name": Equal(name),
"ClusterName": Equal(name),
}),
})
}
145 changes: 52 additions & 93 deletions pkg/controller/mysqlbackupcron/job_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,133 +20,92 @@ import (
"context"
"fmt"
"sort"
"sync"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"sigs.k8s.io/controller-runtime/pkg/client"

api "github.com/presslabs/mysql-operator/pkg/apis/mysql/v1alpha1"
)

var (
// polling time for backup to be completed
backupPollingTime = 5 * time.Second
// time to wait for a backup to be completed
backupWatchTimeout = time.Hour
)

// The job structure contains the context to schedule a backup
type job struct {
Name string
Namespace string
ClusterName string
Namespace string

BackupRunning *bool

lock *sync.Mutex
c client.Client
// kubernetes client
c client.Client

BackupScheduleJobsHistoryLimit *int
}

func (j job) Run() {
backupName := fmt.Sprintf("%s-auto-backup-%s", j.Name, time.Now().Format("2006-01-02t15-04-05"))
backupKey := types.NamespacedName{Name: backupName, Namespace: j.Namespace}
log.Info("scheduled backup job started", "namespace", j.Namespace, "name", backupName)
func (j *job) Run() {
log.Info("scheduled backup job started", "namespace", j.Namespace, "cluster_name", j.ClusterName)

// run garbage collector if needed
if j.BackupScheduleJobsHistoryLimit != nil {
defer j.backupGC()
}

// Wrap backup creation to ensure that lock is released when backup is
// created

created := func() bool {
j.lock.Lock()
defer j.lock.Unlock()

if *j.BackupRunning {
log.Info("last scheduled backup still running! Can't initiate a new backup",
"cluster", fmt.Sprintf("%s/%s", j.Namespace, j.Name))
return false
}
// check if a backup is running
if j.scheduledBackupsRunningCount() > 0 {
log.V(1).Info("at least a backup is running",
"backups_len", j.scheduledBackupsRunningCount())
return
}

tries := 0
for {
var err error
cluster := &api.MysqlBackup{
ObjectMeta: metav1.ObjectMeta{
Name: backupName,
Namespace: j.Namespace,
Labels: map[string]string{
"recurrent": "true",
},
},
Spec: api.MysqlBackupSpec{
ClusterName: j.Name,
},
}
if err = j.c.Create(context.TODO(), cluster); err == nil {
break
}
// create the backup
if _, err := j.createBackup(); err != nil {
log.Error(err, "failed to create backup")
}
}

if tries > 5 {
log.Error(err, "fail to create backup, max tries exceeded",
"cluster", j.Name, "retries", tries, "backup", backupName)
return false
}
func (j *job) scheduledBackupsRunningCount() int {
backupsList := &api.MysqlBackupList{}
// select all backups with labels recurrent=true and and not completed of the cluster
selector := j.backupSelector()
selector.MatchingField("status.completed", "false")

log.Info("failed to create backup, retring", "backup", backupName,
"error", err, "tries", tries)
if err := j.c.List(context.TODO(), selector, backupsList); err != nil {
log.Error(err, "failed getting backups", "selector", selector)
return 0
}

time.Sleep(5 * time.Second)
tries++
}
return len(backupsList.Items)
}

*j.BackupRunning = true
return true
}()
if !created {
return
func (j *job) createBackup() (*api.MysqlBackup, error) {
backupName := fmt.Sprintf("%s-auto-%s", j.ClusterName, time.Now().Format("2006-01-02t15-04-05"))

backup := &api.MysqlBackup{
ObjectMeta: metav1.ObjectMeta{
Name: backupName,
Namespace: j.Namespace,
Labels: j.recurrentBackupLabels(),
},
Spec: api.MysqlBackupSpec{
ClusterName: j.ClusterName,
},
}
return backup, j.c.Create(context.TODO(), backup)
}

defer func() {
j.lock.Lock()
defer j.lock.Unlock()
*j.BackupRunning = false
}()

err := wait.PollImmediate(backupPollingTime, backupWatchTimeout, func() (bool, error) {
backup := &api.MysqlBackup{}
if err := j.c.Get(context.TODO(), backupKey, backup); err != nil {
log.Info("failed to get backup", "backup", backupName, "error", err)
return false, nil
}
if backup.Status.Completed {
log.Info("backup finished", "backup", backup)
return true, nil
}

return false, nil
})
func (j *job) backupSelector() *client.ListOptions {
return client.InNamespace(j.Namespace).MatchingLabels(j.recurrentBackupLabels())
}

if err != nil {
log.Error(err, "waiting for backup to finish, failed",
"backup", backupName, "cluster", fmt.Sprintf("%s/%s", j.Namespace, j.Name))
func (j *job) recurrentBackupLabels() map[string]string {
return map[string]string{
"recurrent": "true",
"cluster": j.ClusterName,
}
}

func (j *job) backupGC() {
var err error

backupsList := &api.MysqlBackupList{}
selector := &client.ListOptions{}
selector = selector.InNamespace(j.Namespace).MatchingLabels(map[string]string{"recurrent": "true"})

if err = j.c.List(context.TODO(), selector, backupsList); err != nil {
log.Error(err, "failed getting backups", "selector", selector)
if err = j.c.List(context.TODO(), j.backupSelector(), backupsList); err != nil {
log.Error(err, "failed getting backups", "selector", j.backupSelector())
return
}

Expand Down
76 changes: 54 additions & 22 deletions pkg/controller/mysqlbackupcron/job_backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package mysqlbackupcron
import (
"fmt"
"math/rand"
"sync"
"time"

. "github.com/onsi/ginkgo"
Expand All @@ -44,36 +43,51 @@ var _ = Describe("MysqlBackupCron cron job", func() {
c client.Client
// stop channel for controller manager
stop chan struct{}

clusterName string
namespace string
j *job
)

BeforeEach(func() {
mgr, err := manager.New(cfg, manager.Options{})
Expect(err).To(Succeed())
c = mgr.GetClient()

// NOTE: field indexer should be added before starting the manager
Expect(addBackupFieldIndexers(mgr)).To(Succeed())

stop = StartTestManager(mgr)

clusterName = fmt.Sprintf("cl-%d", rand.Int31())
namespace = "default"

limit := 5
j = &job{
ClusterName: clusterName,
Namespace: namespace,
c: c,
BackupScheduleJobsHistoryLimit: &limit,
}
})
AfterEach(func() {
close(stop)
})

When("more backups are created", func() {
var (
clusterName string
ns string
backups []api.MysqlBackup
backups []api.MysqlBackup
)

BeforeEach(func() {
clusterName = fmt.Sprintf("cl-%d", rand.Int31())
ns = "default"

for i := 0; i < 10; i++ {
for i := 0; i < (*j.BackupScheduleJobsHistoryLimit + 5); i++ {
backup := api.MysqlBackup{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("bk-%d", i),
Namespace: ns,
Namespace: namespace,
Labels: map[string]string{
"recurrent": "true",
"cluster": clusterName,
},
},
Spec: api.MysqlBackupSpec{
Expand All @@ -82,7 +96,7 @@ var _ = Describe("MysqlBackupCron cron job", func() {
}
Expect(c.Create(context.TODO(), &backup)).To(Succeed())
backups = append(backups, backup)
time.Sleep(time.Second / 3)
time.Sleep(time.Second / 6)
}
})

Expand All @@ -93,29 +107,47 @@ var _ = Describe("MysqlBackupCron cron job", func() {
})

It("should delete only older backups", func() {
f := false
limit := len(backups) - 5
j := job{
Name: clusterName,
Namespace: ns,
BackupRunning: &f,
lock: &sync.Mutex{},
c: c,
BackupScheduleJobsHistoryLimit: &limit,
}

lo := &client.ListOptions{
LabelSelector: labels.SelectorFromSet(labels.Set{
"recurrent": "true",
"cluster": clusterName,
}),
Namespace: ns,
Namespace: namespace,
}
Eventually(testutil.ListAllBackupsFn(c, lo)).Should(HaveLen(len(backups)))

j.backupGC()

Eventually(testutil.ListAllBackupsFn(c, lo)).Should(HaveLen(limit))
Eventually(testutil.ListAllBackupsFn(c, lo)).Should(HaveLen(*j.BackupScheduleJobsHistoryLimit))
Eventually(testutil.ListAllBackupsFn(c, lo)).ShouldNot(
ContainElement(testutil.BackupWithName("bk-3")))
})
})

When("a backup exists", func() {
var (
backup *api.MysqlBackup
)

BeforeEach(func() {
var err error
backup, err = j.createBackup()
Expect(err).To(Succeed())
})
AfterEach(func() {
c.Delete(context.TODO(), backup)
})

It("should detect the running backup", func() {
Eventually(j.scheduledBackupsRunningCount).Should(Equal(1))
})

It("should not detect any running backup", func() {
backup.Status.Completed = true
Expect(c.Update(context.TODO(), backup)).To(Succeed())

Eventually(j.scheduledBackupsRunningCount).Should(Equal(0))
})
})
})
22 changes: 15 additions & 7 deletions pkg/controller/mysqlbackupcron/mysqlbackupcron_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
return err
}

return nil
return addBackupFieldIndexers(mgr)
}

var _ reconcile.Reconciler = &ReconcileMysqlBackup{}
Expand Down Expand Up @@ -147,8 +147,8 @@ func (r *ReconcileMysqlBackup) updateClusterSchedule(cluster *mysqlv1alpha1.Mysq
defer r.lockJobRegister.Unlock()

for _, entry := range r.cron.Entries() {
j, ok := entry.Job.(job)
if ok && j.Name == cluster.Name && j.Namespace == cluster.Namespace {
j, ok := entry.Job.(*job)
if ok && j.ClusterName == cluster.Name && j.Namespace == cluster.Namespace {
log.V(1).Info("cluster already added to cron.", "cluster", cluster.Name)

// change scheduler for already added crons
Expand Down Expand Up @@ -182,12 +182,10 @@ func (r *ReconcileMysqlBackup) updateClusterSchedule(cluster *mysqlv1alpha1.Mysq
}
}

r.cron.Schedule(schedule, job{
Name: cluster.Name,
r.cron.Schedule(schedule, &job{
ClusterName: cluster.Name,
Namespace: cluster.Namespace,
c: r.Client,
BackupRunning: new(bool),
lock: new(sync.Mutex),
BackupScheduleJobsHistoryLimit: cluster.Spec.BackupScheduleJobsHistoryLimit,
}, cluster.Name)

Expand All @@ -204,3 +202,13 @@ func (r *ReconcileMysqlBackup) unregisterCluster(clusterKey types.NamespacedName

return nil
}

func addBackupFieldIndexers(mgr manager.Manager) error {
return mgr.GetFieldIndexer().IndexField(&mysqlv1alpha1.MysqlBackup{}, "status.completed", func(b runtime.Object) []string {
completed := "false"
if b.(*mysqlv1alpha1.MysqlBackup).Status.Completed {
completed = "true"
}
return []string{completed}
})
}
Loading