Skip to content

Commit

Permalink
Avoid duplicated Partitions being created from a PartitionSet.
Browse files Browse the repository at this point in the history
Signed-off-by: Frederic Giloux <fgiloux@redhat.com>
  • Loading branch information
fgiloux committed Mar 15, 2023
1 parent 3f8a000 commit 0b72c75
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 57 deletions.
31 changes: 22 additions & 9 deletions pkg/reconciler/topology/partitionset/partitionset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
Expand All @@ -41,7 +42,6 @@ import (
topologyv1alpha1client "github.com/kcp-dev/kcp/pkg/client/clientset/versioned/typed/topology/v1alpha1"
coreinformers "github.com/kcp-dev/kcp/pkg/client/informers/externalversions/core/v1alpha1"
topologyinformers "github.com/kcp-dev/kcp/pkg/client/informers/externalversions/topology/v1alpha1"
"github.com/kcp-dev/kcp/pkg/indexers"
"github.com/kcp-dev/kcp/pkg/logging"
"github.com/kcp-dev/kcp/pkg/reconciler/committer"
)
Expand Down Expand Up @@ -71,12 +71,29 @@ func NewController(
getPartitionSet: func(clusterName logicalcluster.Name, name string) (*topologyv1alpha1.PartitionSet, error) {
return partitionSetClusterInformer.Lister().Cluster(clusterName).Get(name)
},
getPartitionsByPartitionSet: func(partitionSet *topologyv1alpha1.PartitionSet) ([]*topologyv1alpha1.Partition, error) {
key, err := kcpcache.MetaClusterNamespaceKeyFunc(partitionSet)
// getPartitionsByPartitionSet calls the API directly instead of using an indexer
// as otherwise duplicates are created when the cache is not updated quickly enough.
getPartitionsByPartitionSet: func(ctx context.Context, partitionSet *topologyv1alpha1.PartitionSet) ([]*topologyv1alpha1.Partition, error) {
partitions, err := kcpClusterClient.Cluster(logicalcluster.From(partitionSet).Path()).TopologyV1alpha1().Partitions().List(ctx, metav1.ListOptions{})
if err != nil {
return nil, err
}
return indexers.ByIndex[*topologyv1alpha1.Partition](partitionClusterInformer.Informer().GetIndexer(), indexPartitionsByPartitionSet, key)
result := []*topologyv1alpha1.Partition{}
for i := range partitions.Items {
for _, owner := range partitions.Items[i].OwnerReferences {
ownerGV, err := schema.ParseGroupVersion(owner.APIVersion)
if err != nil {
continue
}
if owner.UID == partitionSet.UID &&
owner.Kind == "PartitionSet" &&
ownerGV.Group == topologyv1alpha1.SchemeGroupVersion.Group {
result = append(result, &partitions.Items[i])
break
}
}
}
return result, nil
},
createPartition: func(ctx context.Context, path logicalcluster.Path, partition *topologyv1alpha1.Partition) (*topologyv1alpha1.Partition, error) {
return kcpClusterClient.Cluster(path).TopologyV1alpha1().Partitions().Create(ctx, partition, metav1.CreateOptions{})
Expand All @@ -88,10 +105,6 @@ func NewController(
commit: committer.NewCommitter[*PartitionSet, Patcher, *PartitionSetSpec, *PartitionSetStatus](kcpClusterClient.TopologyV1alpha1().PartitionSets()),
}

indexers.AddIfNotPresentOrDie(partitionClusterInformer.Informer().GetIndexer(), cache.Indexers{
indexPartitionsByPartitionSet: indexPartitionsByPartitionSetFunc,
})

globalShardClusterInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
Expand Down Expand Up @@ -157,7 +170,7 @@ type controller struct {
listShards func(selector labels.Selector) ([]*corev1alpha1.Shard, error)
listPartitionSets func() ([]*topologyv1alpha1.PartitionSet, error)
getPartitionSet func(clusterName logicalcluster.Name, name string) (*topologyv1alpha1.PartitionSet, error)
getPartitionsByPartitionSet func(partitionSet *topologyv1alpha1.PartitionSet) ([]*topologyv1alpha1.Partition, error)
getPartitionsByPartitionSet func(ctx context.Context, partitionSet *topologyv1alpha1.PartitionSet) ([]*topologyv1alpha1.Partition, error)
createPartition func(ctx context.Context, path logicalcluster.Path, partition *topologyv1alpha1.Partition) (*topologyv1alpha1.Partition, error)
deletePartition func(ctx context.Context, path logicalcluster.Path, partitionName string) error
commit CommitFunc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func TestReconcile(t *testing.T) {
}, nil
},

getPartitionsByPartitionSet: func(partitionSet *topologyv1alpha1.PartitionSet) ([]*topologyv1alpha1.Partition, error) {
getPartitionsByPartitionSet: func(ctx context.Context, partitionSet *topologyv1alpha1.PartitionSet) ([]*topologyv1alpha1.Partition, error) {
var partitions []*topologyv1alpha1.Partition
if tc.existingValidPartition {
partition := &topologyv1alpha1.Partition{
Expand Down
46 changes: 0 additions & 46 deletions pkg/reconciler/topology/partitionset/partitionset_indexes.go

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (c *controller) reconcile(ctx context.Context, partitionSet *topologyv1alph
return err
}

oldPartitions, err := c.getPartitionsByPartitionSet(partitionSet)
oldPartitions, err := c.getPartitionsByPartitionSet(ctx, partitionSet)
if err != nil {
conditions.MarkFalse(
partitionSet,
Expand Down

0 comments on commit 0b72c75

Please sign in to comment.