Skip to content

Commit

Permalink
pkg/restic: fix concurrency bugs causing dupe repos, panics
Browse files Browse the repository at this point in the history
Signed-off-by: Steve Kriss <krisss@vmware.com>
  • Loading branch information
skriss committed Feb 26, 2019
1 parent e4771f5 commit e3e76c2
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 2 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/1235-skriss
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix concurrency bug in code ensuring restic repository exists
60 changes: 58 additions & 2 deletions pkg/restic/repository_ensurer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,31 @@ import (

// repositoryEnsurer ensures that Velero restic repositories are created and ready.
type repositoryEnsurer struct {
log logrus.FieldLogger
repoLister velerov1listers.ResticRepositoryLister
repoClient velerov1client.ResticRepositoriesGetter

readyChansLock sync.Mutex
readyChans map[string]chan *velerov1api.ResticRepository

// repoLocksMu synchronizes reads/writes to the repoLocks map itself
// since maps are not threadsafe.
repoLocksMu sync.Mutex
repoLocks map[repoKey]*sync.Mutex
}

type repoKey struct {
volumeNamespace string
backupLocation string
}

func newRepositoryEnsurer(repoInformer velerov1informers.ResticRepositoryInformer, repoClient velerov1client.ResticRepositoriesGetter, log logrus.FieldLogger) *repositoryEnsurer {
r := &repositoryEnsurer{
log: log,
repoLister: repoInformer.Lister(),
repoClient: repoClient,
readyChans: make(map[string]chan *velerov1api.ResticRepository),
repoLocks: make(map[repoKey]*sync.Mutex),
}

repoInformer.Informer().AddEventHandler(
Expand All @@ -67,7 +80,7 @@ func newRepositoryEnsurer(repoInformer velerov1informers.ResticRepositoryInforme
}

readyChan <- newObj
delete(r.readyChans, newObj.Name)
delete(r.readyChans, key)
}
},
},
Expand All @@ -84,6 +97,30 @@ func repoLabels(volumeNamespace, backupLocation string) labels.Set {
}

func (r *repositoryEnsurer) EnsureRepo(ctx context.Context, namespace, volumeNamespace, backupLocation string) (*velerov1api.ResticRepository, error) {
log := r.log.WithField("volumeNamespace", volumeNamespace).WithField("backupLocation", backupLocation)

// It's only safe to have one instance of this method executing concurrently for a
// given volumeNamespace + backupLocation, so synchronize based on that. It's fine
// to run concurrently for *different* namespaces/locations. If you had 2 goroutines
// running this for the same inputs, both might find no ResticRepository exists, then
// both would create new ones for the same namespace/location.
//
// This issue could probably be avoided if we had a deterministic name for
// each restic repository, and we just tried to create it, checked for an
// AlreadyExists err, and then waited for it to be ready. However, there are
// already repositories in the wild with non-deterministic names (i.e. using
// GenerateName) which poses a backwards compatibility problem.
log.Debug("Acquiring lock")

repoMu := r.repoLock(volumeNamespace, backupLocation)
repoMu.Lock()
defer func() {
repoMu.Unlock()
log.Debug("Released lock")
}()

log.Debug("Acquired lock")

selector := labels.SelectorFromSet(repoLabels(volumeNamespace, backupLocation))

repos, err := r.repoLister.ResticRepositories(namespace).List(selector)
Expand All @@ -97,11 +134,14 @@ func (r *repositoryEnsurer) EnsureRepo(ctx context.Context, namespace, volumeNam
if repos[0].Status.Phase != velerov1api.ResticRepositoryPhaseReady {
return nil, errors.New("restic repository is not ready")
}

log.Debug("Ready repository found")
return repos[0], nil
}

// no repo found: create one and wait for it to be ready
log.Debug("No repository found, creating one")

// no repo found: create one and wait for it to be ready
repo := &velerov1api.ResticRepository{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Expand Down Expand Up @@ -137,3 +177,19 @@ func (r *repositoryEnsurer) getReadyChan(name string) chan *velerov1api.ResticRe
r.readyChans[name] = make(chan *velerov1api.ResticRepository)
return r.readyChans[name]
}

func (r *repositoryEnsurer) repoLock(volumeNamespace, backupLocation string) *sync.Mutex {
r.repoLocksMu.Lock()
defer r.repoLocksMu.Unlock()

key := repoKey{
volumeNamespace: volumeNamespace,
backupLocation: backupLocation,
}

if r.repoLocks[key] == nil {
r.repoLocks[key] = new(sync.Mutex)
}

return r.repoLocks[key]
}

0 comments on commit e3e76c2

Please sign in to comment.