From 0b72c7597f893c5155a57af2689cb0dfe26da531 Mon Sep 17 00:00:00 2001 From: Frederic Giloux Date: Tue, 14 Mar 2023 09:24:56 +0100 Subject: [PATCH] Avoid duplicated Partitions being created from a PartitionSet. Signed-off-by: Frederic Giloux --- .../partitionset/partitionset_controller.go | 31 +++++++++---- .../partitionset_controller_test.go | 2 +- .../partitionset/partitionset_indexes.go | 46 ------------------- .../partitionset/partitionset_reconcile.go | 2 +- 4 files changed, 24 insertions(+), 57 deletions(-) delete mode 100644 pkg/reconciler/topology/partitionset/partitionset_indexes.go diff --git a/pkg/reconciler/topology/partitionset/partitionset_controller.go b/pkg/reconciler/topology/partitionset/partitionset_controller.go index 002d658156d..2ed2d9469bd 100644 --- a/pkg/reconciler/topology/partitionset/partitionset_controller.go +++ b/pkg/reconciler/topology/partitionset/partitionset_controller.go @@ -28,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" @@ -41,7 +42,6 @@ import ( topologyv1alpha1client "github.com/kcp-dev/kcp/pkg/client/clientset/versioned/typed/topology/v1alpha1" coreinformers "github.com/kcp-dev/kcp/pkg/client/informers/externalversions/core/v1alpha1" topologyinformers "github.com/kcp-dev/kcp/pkg/client/informers/externalversions/topology/v1alpha1" - "github.com/kcp-dev/kcp/pkg/indexers" "github.com/kcp-dev/kcp/pkg/logging" "github.com/kcp-dev/kcp/pkg/reconciler/committer" ) @@ -71,12 +71,29 @@ func NewController( getPartitionSet: func(clusterName logicalcluster.Name, name string) (*topologyv1alpha1.PartitionSet, error) { return partitionSetClusterInformer.Lister().Cluster(clusterName).Get(name) }, - getPartitionsByPartitionSet: func(partitionSet *topologyv1alpha1.PartitionSet) ([]*topologyv1alpha1.Partition, error) { - key, err := kcpcache.MetaClusterNamespaceKeyFunc(partitionSet) + // getPartitionsByPartitionSet calls the API directly instead of using an indexer + // as otherwise duplicates are created when the cache is not updated quickly enough. + getPartitionsByPartitionSet: func(ctx context.Context, partitionSet *topologyv1alpha1.PartitionSet) ([]*topologyv1alpha1.Partition, error) { + partitions, err := kcpClusterClient.Cluster(logicalcluster.From(partitionSet).Path()).TopologyV1alpha1().Partitions().List(ctx, metav1.ListOptions{}) if err != nil { return nil, err } - return indexers.ByIndex[*topologyv1alpha1.Partition](partitionClusterInformer.Informer().GetIndexer(), indexPartitionsByPartitionSet, key) + result := []*topologyv1alpha1.Partition{} + for i := range partitions.Items { + for _, owner := range partitions.Items[i].OwnerReferences { + ownerGV, err := schema.ParseGroupVersion(owner.APIVersion) + if err != nil { + continue + } + if owner.UID == partitionSet.UID && + owner.Kind == "PartitionSet" && + ownerGV.Group == topologyv1alpha1.SchemeGroupVersion.Group { + result = append(result, &partitions.Items[i]) + break + } + } + } + return result, nil }, createPartition: func(ctx context.Context, path logicalcluster.Path, partition *topologyv1alpha1.Partition) (*topologyv1alpha1.Partition, error) { return kcpClusterClient.Cluster(path).TopologyV1alpha1().Partitions().Create(ctx, partition, metav1.CreateOptions{}) @@ -88,10 +105,6 @@ func NewController( commit: committer.NewCommitter[*PartitionSet, Patcher, *PartitionSetSpec, *PartitionSetStatus](kcpClusterClient.TopologyV1alpha1().PartitionSets()), } - indexers.AddIfNotPresentOrDie(partitionClusterInformer.Informer().GetIndexer(), cache.Indexers{ - indexPartitionsByPartitionSet: indexPartitionsByPartitionSetFunc, - }) - globalShardClusterInformer.Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { @@ -157,7 +170,7 @@ type controller struct { listShards func(selector labels.Selector) ([]*corev1alpha1.Shard, error) listPartitionSets func() ([]*topologyv1alpha1.PartitionSet, error) getPartitionSet func(clusterName logicalcluster.Name, name string) (*topologyv1alpha1.PartitionSet, error) - getPartitionsByPartitionSet func(partitionSet *topologyv1alpha1.PartitionSet) ([]*topologyv1alpha1.Partition, error) + getPartitionsByPartitionSet func(ctx context.Context, partitionSet *topologyv1alpha1.PartitionSet) ([]*topologyv1alpha1.Partition, error) createPartition func(ctx context.Context, path logicalcluster.Path, partition *topologyv1alpha1.Partition) (*topologyv1alpha1.Partition, error) deletePartition func(ctx context.Context, path logicalcluster.Path, partitionName string) error commit CommitFunc diff --git a/pkg/reconciler/topology/partitionset/partitionset_controller_test.go b/pkg/reconciler/topology/partitionset/partitionset_controller_test.go index 7518f425f9c..4a869a0ec50 100644 --- a/pkg/reconciler/topology/partitionset/partitionset_controller_test.go +++ b/pkg/reconciler/topology/partitionset/partitionset_controller_test.go @@ -226,7 +226,7 @@ func TestReconcile(t *testing.T) { }, nil }, - getPartitionsByPartitionSet: func(partitionSet *topologyv1alpha1.PartitionSet) ([]*topologyv1alpha1.Partition, error) { + getPartitionsByPartitionSet: func(ctx context.Context, partitionSet *topologyv1alpha1.PartitionSet) ([]*topologyv1alpha1.Partition, error) { var partitions []*topologyv1alpha1.Partition if tc.existingValidPartition { partition := &topologyv1alpha1.Partition{ diff --git a/pkg/reconciler/topology/partitionset/partitionset_indexes.go b/pkg/reconciler/topology/partitionset/partitionset_indexes.go deleted file mode 100644 index ee79c2dc378..00000000000 --- a/pkg/reconciler/topology/partitionset/partitionset_indexes.go +++ /dev/null @@ -1,46 +0,0 @@ -/* -Copyright 2023 The KCP Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package partitionset - -import ( - "fmt" - - "github.com/kcp-dev/logicalcluster/v3" - - topologyv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/topology/v1alpha1" - "github.com/kcp-dev/kcp/pkg/client" -) - -const indexPartitionsByPartitionSet = "indexPartitionsByPartitionSet" - -// indexPartitionsByPartitionSetFunc is an index function that maps a Partition to the key for its -// PartitionSet. -func indexPartitionsByPartitionSetFunc(obj interface{}) ([]string, error) { - partition, ok := obj.(*topologyv1alpha1.Partition) - if !ok { - return []string{}, fmt.Errorf("obj is supposed to be a Partition, but is %T", obj) - } - - for _, ownerRef := range partition.OwnerReferences { - if ownerRef.Kind == "PartitionSet" { - path := logicalcluster.From(partition).Path() - key := client.ToClusterAwareKey(path, ownerRef.Name) - return []string{key}, nil - } - } - return []string{}, nil -} diff --git a/pkg/reconciler/topology/partitionset/partitionset_reconcile.go b/pkg/reconciler/topology/partitionset/partitionset_reconcile.go index f99c2b02584..4dee08ae90b 100644 --- a/pkg/reconciler/topology/partitionset/partitionset_reconcile.go +++ b/pkg/reconciler/topology/partitionset/partitionset_reconcile.go @@ -77,7 +77,7 @@ func (c *controller) reconcile(ctx context.Context, partitionSet *topologyv1alph return err } - oldPartitions, err := c.getPartitionsByPartitionSet(partitionSet) + oldPartitions, err := c.getPartitionsByPartitionSet(ctx, partitionSet) if err != nil { conditions.MarkFalse( partitionSet,