Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bugs in restic repository ensurer #1235

Merged
merged 1 commit into from
Feb 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelogs/unreleased/1235-skriss
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix concurrency bug in code ensuring restic repository exists
60 changes: 58 additions & 2 deletions pkg/restic/repository_ensurer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,31 @@ import (

// repositoryEnsurer ensures that Velero restic repositories are created and ready.
type repositoryEnsurer struct {
log logrus.FieldLogger
repoLister velerov1listers.ResticRepositoryLister
repoClient velerov1client.ResticRepositoriesGetter

readyChansLock sync.Mutex
readyChans map[string]chan *velerov1api.ResticRepository

// repoLocksMu synchronizes reads/writes to the repoLocks map itself
// since maps are not threadsafe.
repoLocksMu sync.Mutex
repoLocks map[repoKey]*sync.Mutex
}

type repoKey struct {
volumeNamespace string
backupLocation string
}

func newRepositoryEnsurer(repoInformer velerov1informers.ResticRepositoryInformer, repoClient velerov1client.ResticRepositoriesGetter, log logrus.FieldLogger) *repositoryEnsurer {
r := &repositoryEnsurer{
log: log,
repoLister: repoInformer.Lister(),
repoClient: repoClient,
readyChans: make(map[string]chan *velerov1api.ResticRepository),
repoLocks: make(map[repoKey]*sync.Mutex),
}

repoInformer.Informer().AddEventHandler(
Expand All @@ -67,7 +80,7 @@ func newRepositoryEnsurer(repoInformer velerov1informers.ResticRepositoryInforme
}

readyChan <- newObj
delete(r.readyChans, newObj.Name)
delete(r.readyChans, key)
}
},
},
Expand All @@ -84,6 +97,30 @@ func repoLabels(volumeNamespace, backupLocation string) labels.Set {
}

func (r *repositoryEnsurer) EnsureRepo(ctx context.Context, namespace, volumeNamespace, backupLocation string) (*velerov1api.ResticRepository, error) {
log := r.log.WithField("volumeNamespace", volumeNamespace).WithField("backupLocation", backupLocation)

// It's only safe to have one instance of this method executing concurrently for a
// given volumeNamespace + backupLocation, so synchronize based on that. It's fine
// to run concurrently for *different* namespaces/locations. If you had 2 goroutines
// running this for the same inputs, both might find no ResticRepository exists, then
// both would create new ones for the same namespace/location.
//
// This issue could probably be avoided if we had a deterministic name for
// each restic repository, and we just tried to create it, checked for an
// AlreadyExists err, and then waited for it to be ready. However, there are
// already repositories in the wild with non-deterministic names (i.e. using
// GenerateName) which poses a backwards compatibility problem.
log.Debug("Acquiring lock")

repoMu := r.repoLock(volumeNamespace, backupLocation)
repoMu.Lock()
defer func() {
repoMu.Unlock()
log.Debug("Released lock")
}()

log.Debug("Acquired lock")

selector := labels.SelectorFromSet(repoLabels(volumeNamespace, backupLocation))

repos, err := r.repoLister.ResticRepositories(namespace).List(selector)
Expand All @@ -97,11 +134,14 @@ func (r *repositoryEnsurer) EnsureRepo(ctx context.Context, namespace, volumeNam
if repos[0].Status.Phase != velerov1api.ResticRepositoryPhaseReady {
return nil, errors.New("restic repository is not ready")
}

log.Debug("Ready repository found")
return repos[0], nil
}

// no repo found: create one and wait for it to be ready
log.Debug("No repository found, creating one")

// no repo found: create one and wait for it to be ready
repo := &velerov1api.ResticRepository{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Expand Down Expand Up @@ -137,3 +177,19 @@ func (r *repositoryEnsurer) getReadyChan(name string) chan *velerov1api.ResticRe
r.readyChans[name] = make(chan *velerov1api.ResticRepository)
return r.readyChans[name]
}

func (r *repositoryEnsurer) repoLock(volumeNamespace, backupLocation string) *sync.Mutex {
r.repoLocksMu.Lock()
defer r.repoLocksMu.Unlock()

key := repoKey{
volumeNamespace: volumeNamespace,
backupLocation: backupLocation,
}

if r.repoLocks[key] == nil {
r.repoLocks[key] = new(sync.Mutex)
}

return r.repoLocks[key]
}