Skip to content

Commit

Permalink
Merge pull request nephio-project#118 from Nordix/session_conflict_re…
Browse files Browse the repository at this point in the history
…solution

Add session-conflict resolution for concurrent CUD operations on package revisions
  • Loading branch information
nephio-prow[bot] authored Oct 21, 2024
2 parents a3d7d29 + ed9096e commit dca2f6d
Show file tree
Hide file tree
Showing 8 changed files with 545 additions and 43 deletions.
3 changes: 2 additions & 1 deletion pkg/cache/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,12 +338,13 @@ func (r *cachedRepository) Close() error {
// There isn't much use in returning an error here, so we just log it
// and create a PackageRevisionMeta with just name and namespace. This
// makes sure that the Delete event is sent.
klog.Warningf("Error looking up PackageRev CR for %s: %v", nn.Name, err)
klog.Warningf("repo %s: error deleting packagerev for %s: %v", r.id, nn.Name, err)
pkgRevMeta = meta.PackageRevisionMeta{
Name: nn.Name,
Namespace: nn.Namespace,
}
}
klog.Infof("repo %s: successfully deleted packagerev %s/%s", r.id, nn.Namespace, nn.Name)
sent += r.objectNotifier.NotifyPackageRevisionChange(watch.Deleted, pr, pkgRevMeta)
}
klog.Infof("repo %s: sent %d notifications for %d package revisions during close", r.id, sent, len(r.cachedPackageRevisions))
Expand Down
23 changes: 15 additions & 8 deletions pkg/meta/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand Down Expand Up @@ -194,16 +195,22 @@ func (c *crdMetadataStore) Delete(ctx context.Context, namespacedName types.Name
defer span.End()

var internalPkgRev internalapi.PackageRev
err := c.coreClient.Get(ctx, namespacedName, &internalPkgRev)
if err != nil {
return PackageRevisionMeta{}, err
}
retriedErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
err := c.coreClient.Get(ctx, namespacedName, &internalPkgRev)
if err != nil {
return err
}

if clearFinalizers {
internalPkgRev.Finalizers = []string{}
if err = c.coreClient.Update(ctx, &internalPkgRev); err != nil {
return PackageRevisionMeta{}, err
if clearFinalizers {
internalPkgRev.Finalizers = []string{}
if err = c.coreClient.Update(ctx, &internalPkgRev); err != nil {
return err
}
}
return nil
})
if retriedErr != nil {
return PackageRevisionMeta{}, retriedErr
}

klog.Infof("Deleting packagerev %s/%s", internalPkgRev.Namespace, internalPkgRev.Name)
Expand Down
37 changes: 36 additions & 1 deletion pkg/registry/porch/packagecommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package porch
import (
"context"
"fmt"
"sync"

unversionedapi "github.com/nephio-project/porch/api/porch"
api "github.com/nephio-project/porch/api/porch/v1alpha1"
Expand All @@ -34,6 +35,13 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

const ConflictErrorMsgBase = "another request is already in progress %s"

var GenericConflictErrorMsg = fmt.Sprintf(ConflictErrorMsgBase, "on %s \"%s\"")

var mutexMapMutex sync.Mutex
var pkgRevOperationMutexes = map[string]*sync.Mutex{}

type packageCommon struct {
// scheme holds our scheme, for type conversions etc
scheme *runtime.Scheme
Expand Down Expand Up @@ -210,9 +218,21 @@ func (r *packageCommon) updatePackageRevision(ctx context.Context, name string,
return nil, false, apierrors.NewBadRequest("namespace must be specified")
}

pkgMutexKey := getPackageMutexKey(ns, name)
pkgMutex := getMutexForPackage(pkgMutexKey)

locked := pkgMutex.TryLock()
if !locked {
return nil, false,
apierrors.NewConflict(
api.Resource("packagerevisions"),
name,
fmt.Errorf(GenericConflictErrorMsg, "package revision", pkgMutexKey))
}
defer pkgMutex.Unlock()

// isCreate tracks whether this is an update that creates an object (this happens in server-side apply)
isCreate := false

oldRepoPkgRev, err := r.getRepoPkgRev(ctx, name)
if err != nil {
if forceAllowCreate && apierrors.IsNotFound(err) {
Expand Down Expand Up @@ -469,3 +489,18 @@ func (r *packageCommon) validateUpdate(ctx context.Context, newRuntimeObj runtim
r.updateStrategy.Canonicalize(newRuntimeObj)
return nil
}

func getPackageMutexKey(namespace, name string) string {
return fmt.Sprintf("%s/%s", namespace, name)
}

func getMutexForPackage(pkgMutexKey string) *sync.Mutex {
mutexMapMutex.Lock()
defer mutexMapMutex.Unlock()
pkgMutex, alreadyPresent := pkgRevOperationMutexes[pkgMutexKey]
if !alreadyPresent {
pkgMutex = &sync.Mutex{}
pkgRevOperationMutexes[pkgMutexKey] = pkgMutex
}
return pkgMutex
}
60 changes: 53 additions & 7 deletions pkg/registry/porch/packagerevision.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,11 @@ var _ rest.GracefulDeleter = &packageRevisions{}
var _ rest.Watcher = &packageRevisions{}
var _ rest.SingularNameProvider = &packageRevisions{}


// GetSingularName implements the SingularNameProvider interface
func (r *packageRevisions) GetSingularName() (string) {
func (r *packageRevisions) GetSingularName() string {
return "packagerevision"
}


func (r *packageRevisions) New() runtime.Object {
return &api.PackageRevision{}
}
Expand Down Expand Up @@ -120,7 +118,7 @@ func (r *packageRevisions) Get(ctx context.Context, name string, options *metav1
}

// Create implements the Creater interface.
func (r *packageRevisions) Create(ctx context.Context, runtimeObject runtime.Object, createValidation rest.ValidateObjectFunc,
func (r *packageRevisions) Create(ctx context.Context, runtimeObject runtime.Object, createValidation rest.ValidateObjectFunc,
options *metav1.CreateOptions) (runtime.Object, error) {
ctx, span := tracer.Start(ctx, "packageRevisions::Create", trace.WithAttributes())
defer span.End()
Expand Down Expand Up @@ -166,6 +164,20 @@ func (r *packageRevisions) Create(ctx context.Context, runtimeObject runtime.Obj
parentPackage = p
}

pkgMutexKey := uncreatedPackageMutexKey(newApiPkgRev)
pkgMutex := getMutexForPackage(pkgMutexKey)

locked := pkgMutex.TryLock()
if !locked {
conflictError := creationConflictError(newApiPkgRev)
return nil,
apierrors.NewConflict(
api.Resource("packagerevisions"),
"(new creation)",
conflictError)
}
defer pkgMutex.Unlock()

createdRepoPkgRev, err := r.cad.CreatePackageRevision(ctx, repositoryObj, newApiPkgRev, parentPackage)
if err != nil {
return nil, apierrors.NewInternalError(err)
Expand All @@ -184,13 +196,12 @@ func (r *packageRevisions) Create(ctx context.Context, runtimeObject runtime.Obj
// Update finds a resource in the storage and updates it. Some implementations
// may allow updates creates the object - they should set the created boolean
// to true.
func (r *packageRevisions) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc,
func (r *packageRevisions) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc,
updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
ctx, span := tracer.Start(ctx, "packageRevisions::Update", trace.WithAttributes())
defer span.End()

return r.packageCommon.updatePackageRevision(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate,
)
return r.packageCommon.updatePackageRevision(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate)
}

// Delete implements the GracefulDeleter interface.
Expand Down Expand Up @@ -228,10 +239,45 @@ func (r *packageRevisions) Delete(ctx context.Context, name string, deleteValida
return nil, false, err
}

pkgMutexKey := getPackageMutexKey(ns, name)
pkgMutex := getMutexForPackage(pkgMutexKey)

locked := pkgMutex.TryLock()
if !locked {
return nil, false,
apierrors.NewConflict(
api.Resource("packagerevisions"),
name,
fmt.Errorf(GenericConflictErrorMsg, "package revision", pkgMutexKey))
}
defer pkgMutex.Unlock()

if err := r.cad.DeletePackageRevision(ctx, repositoryObj, repoPkgRev); err != nil {
return nil, false, apierrors.NewInternalError(err)
}

// TODO: Should we do an async delete?
return apiPkgRev, true, nil
}

func uncreatedPackageMutexKey(newApiPkgRev *api.PackageRevision) string {
return fmt.Sprintf("%s-%s-%s-%s",
newApiPkgRev.Namespace,
newApiPkgRev.Spec.RepositoryName,
newApiPkgRev.Spec.PackageName,
newApiPkgRev.Spec.WorkspaceName,
)
}

func creationConflictError(newApiPkgRev *api.PackageRevision) error {
return fmt.Errorf(
fmt.Sprintf(
ConflictErrorMsgBase,
"to create package revision with details namespace=%q, repository=%q, package=%q,workspace=%q",
),
newApiPkgRev.Namespace,
newApiPkgRev.Spec.RepositoryName,
newApiPkgRev.Spec.PackageName,
newApiPkgRev.Spec.WorkspaceName,
)
}
16 changes: 13 additions & 3 deletions pkg/registry/porch/packagerevisionresources.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,11 @@ var _ rest.Scoper = &packageRevisionResources{}
var _ rest.Updater = &packageRevisionResources{}
var _ rest.SingularNameProvider = &packageRevisionResources{}


// GetSingularName implements the SingularNameProvider interface
func (r *packageRevisionResources) GetSingularName() (string) {
func (r *packageRevisionResources) GetSingularName() string {
return "packagerevisionresources"
}


func (r *packageRevisionResources) New() runtime.Object {
return &api.PackageRevisionResources{}
}
Expand Down Expand Up @@ -127,6 +125,18 @@ func (r *packageRevisionResources) Update(ctx context.Context, name string, objI
return nil, false, apierrors.NewBadRequest("namespace must be specified")
}

pkgMutexKey := getPackageMutexKey(ns, name)
pkgMutex := getMutexForPackage(pkgMutexKey)
locked := pkgMutex.TryLock()
if !locked {
return nil, false,
apierrors.NewConflict(
api.Resource("packagerevisionresources"),
name,
fmt.Errorf(GenericConflictErrorMsg, "package revision resources", pkgMutexKey))
}
defer pkgMutex.Unlock()

oldRepoPkgRev, err := r.packageCommon.getRepoPkgRev(ctx, name)
if err != nil {
return nil, false, err
Expand Down
Loading

0 comments on commit dca2f6d

Please sign in to comment.