From 402ea75bc09108213949ca9959e7a76c3d62807a Mon Sep 17 00:00:00 2001 From: Joaquim Moreno Date: Thu, 28 Jul 2022 17:19:39 +0000 Subject: [PATCH 1/3] workload/v1alpha1: ToSyncTarget helper func --- pkg/apis/workload/v1alpha1/helpers.go | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/pkg/apis/workload/v1alpha1/helpers.go b/pkg/apis/workload/v1alpha1/helpers.go index 036c2ff34f8..221689ef9b1 100644 --- a/pkg/apis/workload/v1alpha1/helpers.go +++ b/pkg/apis/workload/v1alpha1/helpers.go @@ -17,12 +17,31 @@ limitations under the License. package v1alpha1 import ( + "crypto/sha256" + "math/big" + + "github.com/kcp-dev/logicalcluster/v2" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) // GetResourceState returns the state of the resource for the given sync target, and // whether the state value is a valid state. A missing label is considered invalid. -func GetResourceState(obj metav1.Object, cluster string) (state ResourceState, valid bool) { - value, found := obj.GetLabels()[ClusterResourceStateLabelPrefix+cluster] +func GetResourceState(obj metav1.Object, syncTargetKey string) (state ResourceState, valid bool) { + value, found := obj.GetLabels()[ClusterResourceStateLabelPrefix+syncTargetKey] return ResourceState(value), found && (value == "" || ResourceState(value) == ResourceStateSync) } + +// ToSyncTargetKey hashes the SyncTarget workspace and the SyncTarget name to a string that is used to idenfity +// in a unique way the synctarget in annotations/labels/finalizers. +func ToSyncTargetKey(syncTargetWorkspace logicalcluster.Name, syncTargetName string) string { + hash := sha256.Sum224([]byte(syncTargetWorkspace.String() + syncTargetName)) + base62hash := toBase62(hash) + return base62hash +} + +func toBase62(hash [28]byte) string { + var i big.Int + i.SetBytes(hash[:]) + return i.Text(62) +} From bde54f809e6f151cb72142630610b7ba51812f21 Mon Sep 17 00:00:00 2001 From: Joaquim Moreno Date: Thu, 28 Jul 2022 17:29:03 +0000 Subject: [PATCH 2/3] Wire up SyncTargetKey --- .../namespace_reconcile_scheduling.go | 6 +- .../namespace_reconcile_scheduling_test.go | 82 +++++----- .../placement_reconcile_scheduling.go | 20 +-- .../placement_reconcile_scheduling_test.go | 9 +- pkg/syncer/shared/finalizer.go | 13 +- pkg/syncer/spec/spec_controller.go | 7 +- pkg/syncer/spec/spec_process.go | 24 +-- pkg/syncer/spec/spec_process_test.go | 152 +++++++++--------- pkg/syncer/status/status_controller.go | 4 +- pkg/syncer/status/status_process.go | 4 +- pkg/syncer/status/status_process_test.go | 60 +++---- pkg/syncer/syncer.go | 9 +- pkg/virtual/syncer/builder/build.go | 5 +- .../syncer_apireconciler_controller.go | 6 +- .../syncer_apireconciler_reconcile.go | 4 +- .../e2e/reconciler/cluster/controller_test.go | 23 +-- .../e2e/reconciler/ingress/controller_test.go | 6 +- .../reconciler/namespace/controller_test.go | 8 +- .../reconciler/scheduling/controller_test.go | 9 +- .../scheduling/multi_placements_test.go | 4 +- .../scheduling/placement_scheduler_test.go | 11 +- test/e2e/syncer/syncer_test.go | 38 ++--- 22 files changed, 249 insertions(+), 255 deletions(-) diff --git a/pkg/reconciler/workload/namespace/namespace_reconcile_scheduling.go b/pkg/reconciler/workload/namespace/namespace_reconcile_scheduling.go index 36c443bf596..9089eec9876 100644 --- a/pkg/reconciler/workload/namespace/namespace_reconcile_scheduling.go +++ b/pkg/reconciler/workload/namespace/namespace_reconcile_scheduling.go @@ -33,7 +33,6 @@ import ( schedulingv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/scheduling/v1alpha1" workloadv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/workload/v1alpha1" - placementreconciler "github.com/kcp-dev/kcp/pkg/reconciler/workload/placement" ) const removingGracePeriod = 5 * time.Second @@ -73,10 +72,7 @@ func (r *placementSchedulingReconciler) reconcile(ctx context.Context, ns *corev if !foundScheduled { continue } - - // TODO: location workspace should be considered also - _, syncTarget := placementreconciler.ParseCurrentScheduled(currentScheduled) - scheduledSyncTargets.Insert(syncTarget) + scheduledSyncTargets.Insert(currentScheduled) } // 2. find the scheduled synctarget to the ns, including synced, removing diff --git a/pkg/reconciler/workload/namespace/namespace_reconcile_scheduling_test.go b/pkg/reconciler/workload/namespace/namespace_reconcile_scheduling_test.go index 30efea939e5..0a8ef73a822 100644 --- a/pkg/reconciler/workload/namespace/namespace_reconcile_scheduling_test.go +++ b/pkg/reconciler/workload/namespace/namespace_reconcile_scheduling_test.go @@ -78,7 +78,7 @@ func TestScheduling(t *testing.T) { schedulingv1alpha1.PlacementAnnotationKey: "", }, expectedLabels: map[string]string{ - workloadv1alpha1.ClusterResourceStateLabelPrefix + "test-cluster": string(workloadv1alpha1.ResourceStateSync), + workloadv1alpha1.ClusterResourceStateLabelPrefix + "34sZi3721YwBLDHUuNVIOLxuYp5nEZBpsTQyDq": string(workloadv1alpha1.ResourceStateSync), }, }, { @@ -87,7 +87,7 @@ func TestScheduling(t *testing.T) { schedulingv1alpha1.PlacementAnnotationKey: "", }, labels: map[string]string{ - workloadv1alpha1.ClusterResourceStateLabelPrefix + "test-cluster": string(workloadv1alpha1.ResourceStateSync), + workloadv1alpha1.ClusterResourceStateLabelPrefix + "34sZi3721YwBLDHUuNVIOLxuYp5nEZBpsTQyDq": string(workloadv1alpha1.ResourceStateSync), }, placement: newPlacement("test-placement", "test-location", "test-cluster"), wantPatch: false, @@ -95,7 +95,7 @@ func TestScheduling(t *testing.T) { schedulingv1alpha1.PlacementAnnotationKey: "", }, expectedLabels: map[string]string{ - workloadv1alpha1.ClusterResourceStateLabelPrefix + "test-cluster": string(workloadv1alpha1.ResourceStateSync), + workloadv1alpha1.ClusterResourceStateLabelPrefix + "34sZi3721YwBLDHUuNVIOLxuYp5nEZBpsTQyDq": string(workloadv1alpha1.ResourceStateSync), }, }, { @@ -104,16 +104,16 @@ func TestScheduling(t *testing.T) { schedulingv1alpha1.PlacementAnnotationKey: "", }, labels: map[string]string{ - workloadv1alpha1.ClusterResourceStateLabelPrefix + "test-cluster": string(workloadv1alpha1.ResourceStateSync), + workloadv1alpha1.ClusterResourceStateLabelPrefix + "34sZi3721YwBLDHUuNVIOLxuYp5nEZBpsTQyDq": string(workloadv1alpha1.ResourceStateSync), }, placement: newPlacement("test-placement", "test-location", ""), wantPatch: true, expectedAnnotations: map[string]string{ - schedulingv1alpha1.PlacementAnnotationKey: "", - workloadv1alpha1.InternalClusterDeletionTimestampAnnotationPrefix + "test-cluster": now3339, + schedulingv1alpha1.PlacementAnnotationKey: "", + workloadv1alpha1.InternalClusterDeletionTimestampAnnotationPrefix + "34sZi3721YwBLDHUuNVIOLxuYp5nEZBpsTQyDq": now3339, }, expectedLabels: map[string]string{ - workloadv1alpha1.ClusterResourceStateLabelPrefix + "test-cluster": string(workloadv1alpha1.ResourceStateSync), + workloadv1alpha1.ClusterResourceStateLabelPrefix + "34sZi3721YwBLDHUuNVIOLxuYp5nEZBpsTQyDq": string(workloadv1alpha1.ResourceStateSync), }, }, { @@ -122,46 +122,46 @@ func TestScheduling(t *testing.T) { schedulingv1alpha1.PlacementAnnotationKey: "", }, labels: map[string]string{ - workloadv1alpha1.ClusterResourceStateLabelPrefix + "test-cluster": string(workloadv1alpha1.ResourceStateSync), + workloadv1alpha1.ClusterResourceStateLabelPrefix + "34sZi3721YwBLDHUuNVIOLxuYp5nEZBpsTQyDq": string(workloadv1alpha1.ResourceStateSync), }, placement: newPlacement("test-placement", "test-location", "test-cluster-2"), wantPatch: true, expectedAnnotations: map[string]string{ - schedulingv1alpha1.PlacementAnnotationKey: "", - workloadv1alpha1.InternalClusterDeletionTimestampAnnotationPrefix + "test-cluster": now3339, + schedulingv1alpha1.PlacementAnnotationKey: "", + workloadv1alpha1.InternalClusterDeletionTimestampAnnotationPrefix + "34sZi3721YwBLDHUuNVIOLxuYp5nEZBpsTQyDq": now3339, }, expectedLabels: map[string]string{ - workloadv1alpha1.ClusterResourceStateLabelPrefix + "test-cluster": string(workloadv1alpha1.ResourceStateSync), - workloadv1alpha1.ClusterResourceStateLabelPrefix + "test-cluster-2": string(workloadv1alpha1.ResourceStateSync), + workloadv1alpha1.ClusterResourceStateLabelPrefix + "34sZi3721YwBLDHUuNVIOLxuYp5nEZBpsTQyDq": string(workloadv1alpha1.ResourceStateSync), + workloadv1alpha1.ClusterResourceStateLabelPrefix + "aQA9mRmZ5RuT9vKRZokxZTm1Yk9SqKyfOMoTEr": string(workloadv1alpha1.ResourceStateSync), }, }, { name: "scheduled cluster is removing", annotations: map[string]string{ - schedulingv1alpha1.PlacementAnnotationKey: "", - workloadv1alpha1.InternalClusterDeletionTimestampAnnotationPrefix + "test-cluster": now3339, + schedulingv1alpha1.PlacementAnnotationKey: "", + workloadv1alpha1.InternalClusterDeletionTimestampAnnotationPrefix + "34sZi3721YwBLDHUuNVIOLxuYp5nEZBpsTQyDq": now3339, }, labels: map[string]string{ - workloadv1alpha1.ClusterResourceStateLabelPrefix + "test-cluster": string(workloadv1alpha1.ResourceStateSync), + workloadv1alpha1.ClusterResourceStateLabelPrefix + "34sZi3721YwBLDHUuNVIOLxuYp5nEZBpsTQyDq": string(workloadv1alpha1.ResourceStateSync), }, placement: newPlacement("test-placement", "test-location", "test-cluster"), wantPatch: false, expectedAnnotations: map[string]string{ - schedulingv1alpha1.PlacementAnnotationKey: "", - workloadv1alpha1.InternalClusterDeletionTimestampAnnotationPrefix + "test-cluster": now3339, + schedulingv1alpha1.PlacementAnnotationKey: "", + workloadv1alpha1.InternalClusterDeletionTimestampAnnotationPrefix + "34sZi3721YwBLDHUuNVIOLxuYp5nEZBpsTQyDq": now3339, }, expectedLabels: map[string]string{ - workloadv1alpha1.ClusterResourceStateLabelPrefix + "test-cluster": string(workloadv1alpha1.ResourceStateSync), + workloadv1alpha1.ClusterResourceStateLabelPrefix + "34sZi3721YwBLDHUuNVIOLxuYp5nEZBpsTQyDq": string(workloadv1alpha1.ResourceStateSync), }, }, { name: "remove clusters which is removing after grace period", annotations: map[string]string{ - schedulingv1alpha1.PlacementAnnotationKey: "", - workloadv1alpha1.InternalClusterDeletionTimestampAnnotationPrefix + "test-cluster": time.Now().Add(-1 * (removingGracePeriod + 1)).UTC().Format(time.RFC3339), + schedulingv1alpha1.PlacementAnnotationKey: "", + workloadv1alpha1.InternalClusterDeletionTimestampAnnotationPrefix + "34sZi3721YwBLDHUuNVIOLxuYp5nEZBpsTQyDq": time.Now().Add(-1 * (removingGracePeriod + 1)).UTC().Format(time.RFC3339), }, labels: map[string]string{ - workloadv1alpha1.ClusterResourceStateLabelPrefix + "test-cluster": string(workloadv1alpha1.ResourceStateSync), + workloadv1alpha1.ClusterResourceStateLabelPrefix + "34sZi3721YwBLDHUuNVIOLxuYp5nEZBpsTQyDq": string(workloadv1alpha1.ResourceStateSync), }, placement: newPlacement("test-placement", "test-location", ""), wantPatch: true, @@ -235,8 +235,8 @@ func TestMultiplePlacements(t *testing.T) { schedulingv1alpha1.PlacementAnnotationKey: "", }, expectedLabels: map[string]string{ - workloadv1alpha1.ClusterResourceStateLabelPrefix + "c1": string(workloadv1alpha1.ResourceStateSync), - workloadv1alpha1.ClusterResourceStateLabelPrefix + "c2": string(workloadv1alpha1.ResourceStateSync), + workloadv1alpha1.ClusterResourceStateLabelPrefix + "aPkhvUbGK0xoZIjMnM2pA0AuV1g7i4tBwxu5m4": string(workloadv1alpha1.ResourceStateSync), + workloadv1alpha1.ClusterResourceStateLabelPrefix + "aQtdeEWVcqU7h7AKnYMm3KRQ96U4oU2W04yeOa": string(workloadv1alpha1.ResourceStateSync), }, }, { @@ -253,7 +253,7 @@ func TestMultiplePlacements(t *testing.T) { schedulingv1alpha1.PlacementAnnotationKey: "", }, expectedLabels: map[string]string{ - workloadv1alpha1.ClusterResourceStateLabelPrefix + "c1": string(workloadv1alpha1.ResourceStateSync), + workloadv1alpha1.ClusterResourceStateLabelPrefix + "aQtdeEWVcqU7h7AKnYMm3KRQ96U4oU2W04yeOa": string(workloadv1alpha1.ResourceStateSync), }, }, { @@ -266,16 +266,16 @@ func TestMultiplePlacements(t *testing.T) { schedulingv1alpha1.PlacementAnnotationKey: "", }, labels: map[string]string{ - workloadv1alpha1.ClusterResourceStateLabelPrefix + "c1": string(workloadv1alpha1.ResourceStateSync), - workloadv1alpha1.ClusterResourceStateLabelPrefix + "c2": string(workloadv1alpha1.ResourceStateSync), + workloadv1alpha1.ClusterResourceStateLabelPrefix + "aQtdeEWVcqU7h7AKnYMm3KRQ96U4oU2W04yeOa": string(workloadv1alpha1.ResourceStateSync), + workloadv1alpha1.ClusterResourceStateLabelPrefix + "aPkhvUbGK0xoZIjMnM2pA0AuV1g7i4tBwxu5m4": string(workloadv1alpha1.ResourceStateSync), }, wantPatch: false, expectedAnnotations: map[string]string{ schedulingv1alpha1.PlacementAnnotationKey: "", }, expectedLabels: map[string]string{ - workloadv1alpha1.ClusterResourceStateLabelPrefix + "c1": string(workloadv1alpha1.ResourceStateSync), - workloadv1alpha1.ClusterResourceStateLabelPrefix + "c2": string(workloadv1alpha1.ResourceStateSync), + workloadv1alpha1.ClusterResourceStateLabelPrefix + "aQtdeEWVcqU7h7AKnYMm3KRQ96U4oU2W04yeOa": string(workloadv1alpha1.ResourceStateSync), + workloadv1alpha1.ClusterResourceStateLabelPrefix + "aPkhvUbGK0xoZIjMnM2pA0AuV1g7i4tBwxu5m4": string(workloadv1alpha1.ResourceStateSync), }, }, { @@ -285,25 +285,25 @@ func TestMultiplePlacements(t *testing.T) { newPlacement("p2", "loc2", "c4"), }, annotations: map[string]string{ - schedulingv1alpha1.PlacementAnnotationKey: "", - workloadv1alpha1.InternalClusterDeletionTimestampAnnotationPrefix + "c1": now3339, - workloadv1alpha1.InternalClusterDeletionTimestampAnnotationPrefix + "c2": now3339, + schedulingv1alpha1.PlacementAnnotationKey: "", + workloadv1alpha1.InternalClusterDeletionTimestampAnnotationPrefix + "aQtdeEWVcqU7h7AKnYMm3KRQ96U4oU2W04yeOa": now3339, + workloadv1alpha1.InternalClusterDeletionTimestampAnnotationPrefix + "aPkhvUbGK0xoZIjMnM2pA0AuV1g7i4tBwxu5m4": now3339, }, labels: map[string]string{ - workloadv1alpha1.ClusterResourceStateLabelPrefix + "c1": string(workloadv1alpha1.ResourceStateSync), - workloadv1alpha1.ClusterResourceStateLabelPrefix + "c2": string(workloadv1alpha1.ResourceStateSync), + workloadv1alpha1.ClusterResourceStateLabelPrefix + "aQtdeEWVcqU7h7AKnYMm3KRQ96U4oU2W04yeOa": string(workloadv1alpha1.ResourceStateSync), + workloadv1alpha1.ClusterResourceStateLabelPrefix + "aPkhvUbGK0xoZIjMnM2pA0AuV1g7i4tBwxu5m4": string(workloadv1alpha1.ResourceStateSync), }, wantPatch: true, expectedAnnotations: map[string]string{ - schedulingv1alpha1.PlacementAnnotationKey: "", - workloadv1alpha1.InternalClusterDeletionTimestampAnnotationPrefix + "c1": now3339, - workloadv1alpha1.InternalClusterDeletionTimestampAnnotationPrefix + "c2": now3339, + schedulingv1alpha1.PlacementAnnotationKey: "", + workloadv1alpha1.InternalClusterDeletionTimestampAnnotationPrefix + "aQtdeEWVcqU7h7AKnYMm3KRQ96U4oU2W04yeOa": now3339, + workloadv1alpha1.InternalClusterDeletionTimestampAnnotationPrefix + "aPkhvUbGK0xoZIjMnM2pA0AuV1g7i4tBwxu5m4": now3339, }, expectedLabels: map[string]string{ - workloadv1alpha1.ClusterResourceStateLabelPrefix + "c1": string(workloadv1alpha1.ResourceStateSync), - workloadv1alpha1.ClusterResourceStateLabelPrefix + "c2": string(workloadv1alpha1.ResourceStateSync), - workloadv1alpha1.ClusterResourceStateLabelPrefix + "c3": string(workloadv1alpha1.ResourceStateSync), - workloadv1alpha1.ClusterResourceStateLabelPrefix + "c4": string(workloadv1alpha1.ResourceStateSync), + workloadv1alpha1.ClusterResourceStateLabelPrefix + "aQtdeEWVcqU7h7AKnYMm3KRQ96U4oU2W04yeOa": string(workloadv1alpha1.ResourceStateSync), + workloadv1alpha1.ClusterResourceStateLabelPrefix + "aPkhvUbGK0xoZIjMnM2pA0AuV1g7i4tBwxu5m4": string(workloadv1alpha1.ResourceStateSync), + workloadv1alpha1.ClusterResourceStateLabelPrefix + "5iSfzYTm7pPirj6HKlmfvXMb6AuqSBxNB7vkVP": string(workloadv1alpha1.ResourceStateSync), + workloadv1alpha1.ClusterResourceStateLabelPrefix + "8s5f69zIcmjG486nG2jBF8BdYtgwPS7PVP1bTL": string(workloadv1alpha1.ResourceStateSync), }, }, } @@ -356,7 +356,7 @@ func newPlacement(name, location, synctarget string) *schedulingv1alpha1.Placeme if len(synctarget) > 0 { placement.Annotations = map[string]string{ - workloadv1alpha1.InternalSyncTargetPlacementAnnotationKey: fmt.Sprintf("%s/%s", location, synctarget), + workloadv1alpha1.InternalSyncTargetPlacementAnnotationKey: workloadv1alpha1.ToSyncTargetKey(logicalcluster.New(""), synctarget), } } diff --git a/pkg/reconciler/workload/placement/placement_reconcile_scheduling.go b/pkg/reconciler/workload/placement/placement_reconcile_scheduling.go index 380548241f7..fc24f641944 100644 --- a/pkg/reconciler/workload/placement/placement_reconcile_scheduling.go +++ b/pkg/reconciler/workload/placement/placement_reconcile_scheduling.go @@ -19,9 +19,7 @@ package placement import ( "context" "encoding/json" - "fmt" "math/rand" - "strings" "github.com/kcp-dev/logicalcluster/v2" @@ -67,15 +65,11 @@ func (r *placementSchedulingReconciler) reconcile(ctx context.Context, placement // 2. do nothing if scheduled cluster is in the valid clusters if foundScheduled && len(syncTargets) > 0 { - scheduledSyncTargeClusterName, scheduledSyncTargeName := ParseCurrentScheduled(currentScheduled) for _, syncTarget := range syncTargets { - if syncTargetClusterName != scheduledSyncTargeClusterName { + syncTargetKey := workloadv1alpha1.ToSyncTargetKey(logicalcluster.From(syncTarget), syncTarget.Name) + if syncTargetKey != currentScheduled { continue } - if scheduledSyncTargeName != syncTarget.Name { - continue - } - return reconcileStatusContinue, placement, nil } } @@ -86,7 +80,7 @@ func (r *placementSchedulingReconciler) reconcile(ctx context.Context, placement // to be exclusive. if len(syncTargets) > 0 { scheduledSyncTarget := syncTargets[rand.Intn(len(syncTargets))] - expectedAnnotations[workloadv1alpha1.InternalSyncTargetPlacementAnnotationKey] = fmt.Sprintf("%s/%s", syncTargetClusterName.String(), scheduledSyncTarget.Name) + expectedAnnotations[workloadv1alpha1.InternalSyncTargetPlacementAnnotationKey] = workloadv1alpha1.ToSyncTargetKey(syncTargetClusterName, scheduledSyncTarget.Name) updated, err := r.patchPlacementAnnotation(ctx, clusterName, placement, expectedAnnotations) return reconcileStatusContinue, updated, err } @@ -147,11 +141,3 @@ func (r *placementSchedulingReconciler) patchPlacementAnnotation(ctx context.Con } return updated, nil } - -func ParseCurrentScheduled(value string) (logicalcluster.Name, string) { - if len(value) == 0 { - return logicalcluster.Name{}, "" - } - comps := strings.SplitN(value, "/", 2) - return logicalcluster.New(comps[0]), comps[1] -} diff --git a/pkg/reconciler/workload/placement/placement_reconcile_scheduling_test.go b/pkg/reconciler/workload/placement/placement_reconcile_scheduling_test.go index 12a03950618..e9e6ca3d20c 100644 --- a/pkg/reconciler/workload/placement/placement_reconcile_scheduling_test.go +++ b/pkg/reconciler/workload/placement/placement_reconcile_scheduling_test.go @@ -19,7 +19,6 @@ package placement import ( "context" "encoding/json" - "fmt" "testing" jsonpatch "github.com/evanphx/json-patch" @@ -64,7 +63,7 @@ func TestSchedulingReconcile(t *testing.T) { syncTargets: []*workloadv1alpha1.SyncTarget{newSyncTarget("c1", true)}, wantPatch: true, expectedAnnotations: map[string]string{ - workloadv1alpha1.InternalSyncTargetPlacementAnnotationKey: "/c1", + workloadv1alpha1.InternalSyncTargetPlacementAnnotationKey: "aQtdeEWVcqU7h7AKnYMm3KRQ96U4oU2W04yeOa", }, }, { @@ -73,7 +72,7 @@ func TestSchedulingReconcile(t *testing.T) { location: newLocation("test-location"), syncTargets: []*workloadv1alpha1.SyncTarget{newSyncTarget("c1", true)}, expectedAnnotations: map[string]string{ - workloadv1alpha1.InternalSyncTargetPlacementAnnotationKey: "/c1", + workloadv1alpha1.InternalSyncTargetPlacementAnnotationKey: "aQtdeEWVcqU7h7AKnYMm3KRQ96U4oU2W04yeOa", }, }, { @@ -91,7 +90,7 @@ func TestSchedulingReconcile(t *testing.T) { syncTargets: []*workloadv1alpha1.SyncTarget{newSyncTarget("c1", false), newSyncTarget("c2", true)}, wantPatch: true, expectedAnnotations: map[string]string{ - workloadv1alpha1.InternalSyncTargetPlacementAnnotationKey: "/c2", + workloadv1alpha1.InternalSyncTargetPlacementAnnotationKey: "aPkhvUbGK0xoZIjMnM2pA0AuV1g7i4tBwxu5m4", }, }, } @@ -154,7 +153,7 @@ func newPlacement(name, location, synctarget string) *schedulingv1alpha1.Placeme if len(synctarget) > 0 { placement.Annotations = map[string]string{ - workloadv1alpha1.InternalSyncTargetPlacementAnnotationKey: fmt.Sprintf("/%s", synctarget), + workloadv1alpha1.InternalSyncTargetPlacementAnnotationKey: workloadv1alpha1.ToSyncTargetKey(logicalcluster.New(""), synctarget), } } diff --git a/pkg/syncer/shared/finalizer.go b/pkg/syncer/shared/finalizer.go index 7bfe5b714ac..671d6fc7a5b 100644 --- a/pkg/syncer/shared/finalizer.go +++ b/pkg/syncer/shared/finalizer.go @@ -39,7 +39,7 @@ const ( SyncerFinalizerNamePrefix = "workload.kcp.dev/syncer-" ) -func EnsureUpstreamFinalizerRemoved(ctx context.Context, gvr schema.GroupVersionResource, upstreamInformers dynamicinformer.DynamicSharedInformerFactory, upstreamClient dynamic.ClusterInterface, upstreamNamespace, syncTargetName string, logicalClusterName logicalcluster.Name, resourceName string) error { +func EnsureUpstreamFinalizerRemoved(ctx context.Context, gvr schema.GroupVersionResource, upstreamInformers dynamicinformer.DynamicSharedInformerFactory, upstreamClient dynamic.ClusterInterface, upstreamNamespace, syncTargetKey string, logicalClusterName logicalcluster.Name, resourceName string) error { upstreamObjFromLister, err := upstreamInformers.ForResource(gvr).Lister().ByNamespace(upstreamNamespace).Get(clusters.ToClusterAwareKey(logicalClusterName, resourceName)) if err != nil && !apierrors.IsNotFound(err) { return err @@ -55,7 +55,7 @@ func EnsureUpstreamFinalizerRemoved(ctx context.Context, gvr schema.GroupVersion } // TODO(jmprusi): This check will need to be against "GetDeletionTimestamp()" when using the syncer virtual workspace. - if upstreamObj.GetAnnotations()[workloadv1alpha1.InternalClusterDeletionTimestampAnnotationPrefix+syncTargetName] == "" { + if upstreamObj.GetAnnotations()[workloadv1alpha1.InternalClusterDeletionTimestampAnnotationPrefix+syncTargetKey] == "" { // Do nothing: the object should not be deleted anymore for this location on the KCP side return nil } @@ -66,7 +66,7 @@ func EnsureUpstreamFinalizerRemoved(ctx context.Context, gvr schema.GroupVersion currentFinalizers := upstreamObj.GetFinalizers() desiredFinalizers := []string{} for _, finalizer := range currentFinalizers { - if finalizer != SyncerFinalizerNamePrefix+syncTargetName { + if finalizer != SyncerFinalizerNamePrefix+syncTargetKey { desiredFinalizers = append(desiredFinalizers, finalizer) } } @@ -77,14 +77,13 @@ func EnsureUpstreamFinalizerRemoved(ctx context.Context, gvr schema.GroupVersion // - Begin - // Clean up the status annotation and the locationDeletionAnnotation. annotations := upstreamObj.GetAnnotations() - delete(annotations, workloadv1alpha1.InternalClusterStatusAnnotationPrefix+syncTargetName) - delete(annotations, workloadv1alpha1.InternalClusterDeletionTimestampAnnotationPrefix+syncTargetName) - delete(annotations, workloadv1alpha1.InternalClusterStatusAnnotationPrefix+syncTargetName) + delete(annotations, workloadv1alpha1.InternalClusterStatusAnnotationPrefix+syncTargetKey) + delete(annotations, workloadv1alpha1.InternalClusterDeletionTimestampAnnotationPrefix+syncTargetKey) upstreamObj.SetAnnotations(annotations) // remove the cluster label. upstreamLabels := upstreamObj.GetLabels() - delete(upstreamLabels, workloadv1alpha1.ClusterResourceStateLabelPrefix+syncTargetName) + delete(upstreamLabels, workloadv1alpha1.ClusterResourceStateLabelPrefix+syncTargetKey) upstreamObj.SetLabels(upstreamLabels) // - End of block to be removed once the virtual workspace syncer is integrated - diff --git a/pkg/syncer/spec/spec_controller.go b/pkg/syncer/spec/spec_controller.go index 2bb3c65f9d8..8ce661a2bd6 100644 --- a/pkg/syncer/spec/spec_controller.go +++ b/pkg/syncer/spec/spec_controller.go @@ -60,10 +60,11 @@ type Controller struct { syncTargetName string syncTargetWorkspace logicalcluster.Name syncTargetUID types.UID + syncTargetKey string advancedSchedulingEnabled bool } -func NewSpecSyncer(gvrs []schema.GroupVersionResource, syncTargetWorkspace logicalcluster.Name, syncTargetName string, upstreamURL *url.URL, advancedSchedulingEnabled bool, +func NewSpecSyncer(gvrs []schema.GroupVersionResource, syncTargetWorkspace logicalcluster.Name, syncTargetName, syncTargetKey string, upstreamURL *url.URL, advancedSchedulingEnabled bool, upstreamClient dynamic.ClusterInterface, downstreamClient dynamic.Interface, upstreamInformers, downstreamInformers dynamicinformer.DynamicSharedInformerFactory, syncTargetUID types.UID) (*Controller, error) { c := Controller{ @@ -77,6 +78,7 @@ func NewSpecSyncer(gvrs []schema.GroupVersionResource, syncTargetWorkspace logic syncTargetName: syncTargetName, syncTargetWorkspace: syncTargetWorkspace, syncTargetUID: syncTargetUID, + syncTargetKey: syncTargetKey, advancedSchedulingEnabled: advancedSchedulingEnabled, } @@ -111,7 +113,8 @@ func NewSpecSyncer(gvrs []schema.GroupVersionResource, syncTargetWorkspace logic c.AddToQueue(gvr, obj) }, }) - klog.V(2).InfoS("Set up upstream informer", "syncTarget_workspace", syncTargetWorkspace, "synctarget_name", syncTargetName, "gvr", gvr.String()) + + klog.V(2).InfoS("Set up upstream informer", "syncTargetWorkspace", syncTargetWorkspace, "syncTargetName", syncTargetName, "syncTargetKey", syncTargetKey, "gvr", gvr.String()) downstreamInformers.ForResource(gvr).Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ DeleteFunc: func(obj interface{}) { diff --git a/pkg/syncer/spec/spec_process.go b/pkg/syncer/spec/spec_process.go index eac2291f0d0..d07c668136c 100644 --- a/pkg/syncer/spec/spec_process.go +++ b/pkg/syncer/spec/spec_process.go @@ -209,7 +209,7 @@ func (c *Controller) ensureDownstreamNamespaceExists(ctx context.Context, downst if upstreamObj.GetLabels() != nil { newNamespace.SetLabels(map[string]string{ // TODO: this should be set once at syncer startup and propagated around everywhere. - workloadv1alpha1.InternalDownstreamClusterLabel: c.syncTargetName, + workloadv1alpha1.InternalDownstreamClusterLabel: c.syncTargetKey, }) } @@ -245,17 +245,17 @@ func (c *Controller) ensureSyncerFinalizer(ctx context.Context, gvr schema.Group upstreamFinalizers := upstreamObj.GetFinalizers() hasFinalizer := false for _, finalizer := range upstreamFinalizers { - if finalizer == shared.SyncerFinalizerNamePrefix+c.syncTargetName { + if finalizer == shared.SyncerFinalizerNamePrefix+c.syncTargetKey { hasFinalizer = true } } // TODO(davidfestal): When using syncer virtual workspace we would check the DeletionTimestamp on the upstream object, instead of the DeletionTimestamp annotation, // as the virtual workspace will set the the deletionTimestamp() on the location view by a transformation. - intendedToBeRemovedFromLocation := upstreamObj.GetAnnotations()[workloadv1alpha1.InternalClusterDeletionTimestampAnnotationPrefix+c.syncTargetName] != "" + intendedToBeRemovedFromLocation := upstreamObj.GetAnnotations()[workloadv1alpha1.InternalClusterDeletionTimestampAnnotationPrefix+c.syncTargetKey] != "" // TODO(davidfestal): When using syncer virtual workspace this condition would not be necessary anymore, since directly tested on the virtual workspace side. - stillOwnedByExternalActorForLocation := upstreamObj.GetAnnotations()[workloadv1alpha1.ClusterFinalizerAnnotationPrefix+c.syncTargetName] != "" + stillOwnedByExternalActorForLocation := upstreamObj.GetAnnotations()[workloadv1alpha1.ClusterFinalizerAnnotationPrefix+c.syncTargetKey] != "" if !hasFinalizer && (!intendedToBeRemovedFromLocation || stillOwnedByExternalActorForLocation) { upstreamObjCopy := upstreamObj.DeepCopy() @@ -263,7 +263,7 @@ func (c *Controller) ensureSyncerFinalizer(ctx context.Context, gvr schema.Group namespace := upstreamObjCopy.GetNamespace() logicalCluster := logicalcluster.From(upstreamObjCopy) - upstreamFinalizers = append(upstreamFinalizers, shared.SyncerFinalizerNamePrefix+c.syncTargetName) + upstreamFinalizers = append(upstreamFinalizers, shared.SyncerFinalizerNamePrefix+c.syncTargetKey) upstreamObjCopy.SetFinalizers(upstreamFinalizers) if _, err := c.upstreamClient.Cluster(logicalCluster).Resource(gvr).Namespace(namespace).Update(ctx, upstreamObjCopy, metav1.UpdateOptions{}); err != nil { klog.Errorf("Failed adding finalizer upstream on resource %s|%s/%s: %v", logicalCluster, namespace, name, err) @@ -308,12 +308,12 @@ func (c *Controller) applyToDownstream(ctx context.Context, gvr schema.GroupVers // replace upstream state label with downstream cluster label. We don't want to leak upstream state machine // state to downstream, and also we don't need downstream updates every time the upstream state machine changes. labels := downstreamObj.GetLabels() - delete(labels, workloadv1alpha1.ClusterResourceStateLabelPrefix+c.syncTargetName) - labels[workloadv1alpha1.InternalDownstreamClusterLabel] = c.syncTargetName + delete(labels, workloadv1alpha1.ClusterResourceStateLabelPrefix+c.syncTargetKey) + labels[workloadv1alpha1.InternalDownstreamClusterLabel] = c.syncTargetKey downstreamObj.SetLabels(labels) if c.advancedSchedulingEnabled { - specDiffPatch := upstreamObj.GetAnnotations()[workloadv1alpha1.ClusterSpecDiffAnnotationPrefix+c.syncTargetName] + specDiffPatch := upstreamObj.GetAnnotations()[workloadv1alpha1.ClusterSpecDiffAnnotationPrefix+c.syncTargetKey] if specDiffPatch != "" { upstreamSpec, specExists, err := unstructured.NestedFieldCopy(upstreamObj.UnstructuredContent(), "spec") if err != nil { @@ -349,17 +349,17 @@ func (c *Controller) applyToDownstream(ctx context.Context, gvr schema.GroupVers // TODO(jmprusi): When using syncer virtual workspace we would check the DeletionTimestamp on the upstream object, instead of the DeletionTimestamp annotation, // as the virtual workspace will set the the deletionTimestamp() on the location view by a transformation. - intendedToBeRemovedFromLocation := upstreamObj.GetAnnotations()[workloadv1alpha1.InternalClusterDeletionTimestampAnnotationPrefix+c.syncTargetName] != "" + intendedToBeRemovedFromLocation := upstreamObj.GetAnnotations()[workloadv1alpha1.InternalClusterDeletionTimestampAnnotationPrefix+c.syncTargetKey] != "" // TODO(jmprusi): When using syncer virtual workspace this condition would not be necessary anymore, since directly tested on the virtual workspace side. - stillOwnedByExternalActorForLocation := upstreamObj.GetAnnotations()[workloadv1alpha1.ClusterFinalizerAnnotationPrefix+c.syncTargetName] != "" - + stillOwnedByExternalActorForLocation := upstreamObj.GetAnnotations()[workloadv1alpha1.ClusterFinalizerAnnotationPrefix+c.syncTargetKey] != "" + klog.V(4).Infof("Upstream object %s|%s/%s is intended to be removed %t and owned by external actor is: %t", upstreamObjLogicalCluster, upstreamObj.GetNamespace(), upstreamObj.GetName(), intendedToBeRemovedFromLocation, stillOwnedByExternalActorForLocation) if intendedToBeRemovedFromLocation && !stillOwnedByExternalActorForLocation { if err := c.downstreamClient.Resource(gvr).Namespace(downstreamNamespace).Delete(ctx, downstreamObj.GetName(), metav1.DeleteOptions{}); err != nil { if apierrors.IsNotFound(err) { // That's not an error. // Just think about removing the finalizer from the KCP location-specific resource: - if err := shared.EnsureUpstreamFinalizerRemoved(ctx, gvr, c.upstreamInformers, c.upstreamClient, upstreamObj.GetNamespace(), c.syncTargetName, upstreamObjLogicalCluster, upstreamObj.GetName()); err != nil { + if err := shared.EnsureUpstreamFinalizerRemoved(ctx, gvr, c.upstreamInformers, c.upstreamClient, upstreamObj.GetNamespace(), c.syncTargetKey, upstreamObjLogicalCluster, upstreamObj.GetName()); err != nil { return err } return nil diff --git a/pkg/syncer/spec/spec_process_test.go b/pkg/syncer/spec/spec_process_test.go index d639c0af563..be818e77b12 100644 --- a/pkg/syncer/spec/spec_process_test.go +++ b/pkg/syncer/spec/spec_process_test.go @@ -471,6 +471,7 @@ func TestSyncerProcess(t *testing.T) { upstreamURL string upstreamLogicalCluster string syncTargetName string + syncTargetWorkspace logicalcluster.Name syncTargetUID types.UID advancedSchedulingEnabled bool @@ -481,19 +482,19 @@ func TestSyncerProcess(t *testing.T) { "SpecSyncer sync deployment to downstream, upstream gets patched with the finalizer and the object is not created downstream (will be in the next reconciliation)": { upstreamLogicalCluster: "root:org:ws", fromNamespace: namespace("test", "root:org:ws", map[string]string{ - "internal.workload.kcp.dev/cluster": "us-west1", + "internal.workload.kcp.dev/cluster": "2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5", }, nil), gvr: schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}, fromResources: []runtime.Object{ secret("default-token-abc", "test", "root:org:ws", - map[string]string{"state.workload.kcp.dev/us-west1": "Sync"}, + map[string]string{"state.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": "Sync"}, map[string]string{"kubernetes.io/service-account.name": "default"}, map[string][]byte{ "token": []byte("token"), "namespace": []byte("namespace"), }), deployment("theDeployment", "test", "root:org:ws", map[string]string{ - "state.workload.kcp.dev/us-west1": "Sync", + "state.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": "Sync", }, nil, nil), }, resourceToProcessLogicalClusterName: "root:org:ws", @@ -504,8 +505,8 @@ func TestSyncerProcess(t *testing.T) { updateDeploymentAction("test", toUnstructured(t, changeDeployment( deployment("theDeployment", "test", "root:org:ws", map[string]string{ - "state.workload.kcp.dev/us-west1": "Sync", - }, nil, []string{"workload.kcp.dev/syncer-us-west1"}), + "state.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": "Sync", + }, nil, []string{"workload.kcp.dev/syncer-2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5"}), ))), }, expectActionsOnTo: []clienttesting.Action{ @@ -514,7 +515,7 @@ func TestSyncerProcess(t *testing.T) { changeUnstructured( toUnstructured(t, namespace("kcp-hcbsa8z6c2er", "", map[string]string{ - "internal.workload.kcp.dev/cluster": "us-west1", + "internal.workload.kcp.dev/cluster": "2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5", }, map[string]string{ "kcp.dev/namespace-locator": `{"syncTarget":{"workspace":"root:org:ws","name":"us-west1","uid":"syncTargetUID"},"workspace":"root:org:ws","namespace":"test"}`, @@ -527,20 +528,20 @@ func TestSyncerProcess(t *testing.T) { "SpecSyncer sync to downstream, syncer finalizer already there": { upstreamLogicalCluster: "root:org:ws", fromNamespace: namespace("test", "root:org:ws", map[string]string{ - "state.workload.kcp.dev/us-west1": "Sync", + "state.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": "Sync", }, nil), gvr: schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}, fromResources: []runtime.Object{ secret("default-token-abc", "test", "root:org:ws", - map[string]string{"state.workload.kcp.dev/us-west1": "Sync"}, + map[string]string{"state.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": "Sync"}, map[string]string{"kubernetes.io/service-account.name": "default"}, map[string][]byte{ "token": []byte("token"), "namespace": []byte("namespace"), }), deployment("theDeployment", "test", "root:org:ws", map[string]string{ - "state.workload.kcp.dev/us-west1": "Sync", - }, nil, []string{"workload.kcp.dev/syncer-us-west1"}), + "state.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": "Sync", + }, nil, []string{"workload.kcp.dev/syncer-2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5"}), }, resourceToProcessLogicalClusterName: "root:org:ws", resourceToProcessName: "theDeployment", @@ -553,7 +554,7 @@ func TestSyncerProcess(t *testing.T) { changeUnstructured( toUnstructured(t, namespace("kcp-hcbsa8z6c2er", "", map[string]string{ - "internal.workload.kcp.dev/cluster": "us-west1", + "internal.workload.kcp.dev/cluster": "2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5", }, map[string]string{ "kcp.dev/namespace-locator": `{"syncTarget":{"workspace":"root:org:ws","name":"us-west1","uid":"syncTargetUID"},"workspace":"root:org:ws","namespace":"test"}`, @@ -568,7 +569,7 @@ func TestSyncerProcess(t *testing.T) { toJson(t, changeUnstructured( toUnstructured(t, deployment("theDeployment", "kcp-hcbsa8z6c2er", "", map[string]string{ - "internal.workload.kcp.dev/cluster": "us-west1", + "internal.workload.kcp.dev/cluster": "2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5", }, nil, nil)), setNestedField(map[string]interface{}{}, "status"), setPodSpecServiceAccount("spec", "template", "spec"), @@ -580,12 +581,12 @@ func TestSyncerProcess(t *testing.T) { "SpecSyncer upstream resource has the state workload annotation removed, expect deletion downstream": { upstreamLogicalCluster: "root:org:ws", fromNamespace: namespace("test", "root:org:ws", map[string]string{ - "state.workload.kcp.dev/us-west1": "Sync", + "state.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": "Sync", }, nil), gvr: schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}, fromResources: []runtime.Object{ secret("default-token-abc", "test", "root:org:ws", - map[string]string{"state.workload.kcp.dev/us-west1": "Sync"}, + map[string]string{"state.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": "Sync"}, map[string]string{"kubernetes.io/service-account.name": "default"}, map[string][]byte{ "token": []byte("token"), @@ -608,32 +609,32 @@ func TestSyncerProcess(t *testing.T) { "SpecSyncer deletion: object exist downstream": { upstreamLogicalCluster: "root:org:ws", fromNamespace: namespace("test", "root:org:ws", map[string]string{ - "state.workload.kcp.dev/us-west1": "Sync", + "state.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": "Sync", }, nil), gvr: schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}, toResources: []runtime.Object{ namespace("kcp-hcbsa8z6c2er", "", map[string]string{ - "state.workload.kcp.dev/us-west1": "Sync", + "state.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": "Sync", }, map[string]string{ "kcp.dev/namespace-locator": `{"syncTarget":{"workspace":"root:org:ws","name":"us-west1","uid":"syncTargetUID"},"workspace":"root:org:ws","namespace":"test"}`, }), deployment("theDeployment", "kcp-hcbsa8z6c2er", "root:org:ws", map[string]string{ - "internal.workload.kcp.dev/cluster": "us-west1", - }, nil, []string{"workload.kcp.dev/syncer-us-west1"}), + "internal.workload.kcp.dev/cluster": "2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5", + }, nil, []string{"workload.kcp.dev/syncer-2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5"}), }, fromResources: []runtime.Object{ secret("default-token-abc", "test", "root:org:ws", - map[string]string{"state.workload.kcp.dev/us-west1": "Sync"}, + map[string]string{"state.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": "Sync"}, map[string]string{"kubernetes.io/service-account.name": "default"}, map[string][]byte{ "token": []byte("token"), "namespace": []byte("namespace"), }), deployment("theDeployment", "test", "root:org:ws", - map[string]string{"state.workload.kcp.dev/us-west1": "Sync"}, - map[string]string{"deletion.internal.workload.kcp.dev/us-west1": time.Now().Format(time.RFC3339)}, - []string{"workload.kcp.dev/syncer-us-west1"}), + map[string]string{"state.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": "Sync"}, + map[string]string{"deletion.internal.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": time.Now().Format(time.RFC3339)}, + []string{"workload.kcp.dev/syncer-2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5"}), }, resourceToProcessLogicalClusterName: "root:org:ws", resourceToProcessName: "theDeployment", @@ -650,12 +651,12 @@ func TestSyncerProcess(t *testing.T) { "SpecSyncer deletion: object does not exists downstream, upstream finalizer should be removed": { upstreamLogicalCluster: "root:org:ws", fromNamespace: namespace("test", "root:org:ws", map[string]string{ - "internal.workload.kcp.dev/cluster": "us-west1", + "internal.workload.kcp.dev/cluster": "2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5", }, nil), gvr: schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}, toResources: []runtime.Object{ namespace("kcp-hcbsa8z6c2er", "", map[string]string{ - "state.workload.kcp.dev/us-west1": "Sync", + "state.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": "Sync", }, map[string]string{ "kcp.dev/namespace-locator": `{"syncTarget":{"workspace":"root:org:ws","name":"us-west1","uid":"syncTargetUID"},"workspace":"root:org:ws","namespace":"test"}`, @@ -663,17 +664,17 @@ func TestSyncerProcess(t *testing.T) { }, fromResources: []runtime.Object{ secret("default-token-abc", "test", "root:org:ws", - map[string]string{"state.workload.kcp.dev/us-west1": "Sync"}, + map[string]string{"state.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": "Sync"}, map[string]string{"kubernetes.io/service-account.name": "default"}, map[string][]byte{ "token": []byte("token"), "namespace": []byte("namespace"), }), deployment("theDeployment", "test", "root:org:ws", - map[string]string{"state.workload.kcp.dev/us-west1": "Sync"}, + map[string]string{"state.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": "Sync"}, map[string]string{"another.valid.annotation/this": "value", - "deletion.internal.workload.kcp.dev/us-west1": time.Now().Format(time.RFC3339)}, - []string{"workload.kcp.dev/syncer-us-west1"}), + "deletion.internal.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": time.Now().Format(time.RFC3339)}, + []string{"workload.kcp.dev/syncer-2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5"}), }, resourceToProcessLogicalClusterName: "root:org:ws", resourceToProcessName: "theDeployment", @@ -702,36 +703,36 @@ func TestSyncerProcess(t *testing.T) { "SpecSyncer deletion: upstream object has external finalizer, the object shouldn't be deleted": { upstreamLogicalCluster: "root:org:ws", fromNamespace: namespace("test", "root:org:ws", map[string]string{ - "state.workload.kcp.dev/us-west1": "Sync", + "state.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": "Sync", }, nil), gvr: schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}, toResources: []runtime.Object{ namespace("kcp-hcbsa8z6c2er", "", map[string]string{ - "internal.workload.kcp.dev/cluster": "us-west1", - "state.workload.kcp.dev/us-west1": "Sync", + "internal.workload.kcp.dev/cluster": "2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5", + "state.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": "Sync", }, map[string]string{ "kcp.dev/namespace-locator": `{"syncTarget":{"workspace":"root:org:ws","name":"us-west1","uid":"syncTargetUID"},"workspace":"root:org:ws","namespace":"test"}`, }), deployment("theDeployment", "kcp-hcbsa8z6c2er", "", map[string]string{ - "internal.workload.kcp.dev/cluster": "us-west1", + "internal.workload.kcp.dev/cluster": "2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5", }, nil, nil), }, fromResources: []runtime.Object{ secret("default-token-abc", "test", "root:org:ws", - map[string]string{"state.workload.kcp.dev/us-west1": "Sync"}, + map[string]string{"state.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": "Sync"}, map[string]string{"kubernetes.io/service-account.name": "default"}, map[string][]byte{ "token": []byte("token"), "namespace": []byte("namespace"), }), deployment("theDeployment", "test", "root:org:ws", - map[string]string{"state.workload.kcp.dev/us-west1": "Sync"}, + map[string]string{"state.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": "Sync"}, map[string]string{ - "deletion.internal.workload.kcp.dev/us-west1": time.Now().Format(time.RFC3339), - "finalizers.workload.kcp.dev/us-west1": "another-controller-finalizer", + "deletion.internal.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": time.Now().Format(time.RFC3339), + "finalizers.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": "another-controller-finalizer", }, - []string{"workload.kcp.dev/syncer-us-west1"}), + []string{"workload.kcp.dev/syncer-2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5"}), }, resourceToProcessLogicalClusterName: "root:org:ws", resourceToProcessName: "theDeployment", @@ -746,10 +747,10 @@ func TestSyncerProcess(t *testing.T) { toJson(t, changeUnstructured( toUnstructured(t, deployment("theDeployment", "kcp-hcbsa8z6c2er", "", map[string]string{ - "internal.workload.kcp.dev/cluster": "us-west1", + "internal.workload.kcp.dev/cluster": "2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5", }, map[string]string{ - "deletion.internal.workload.kcp.dev/us-west1": time.Now().Format(time.RFC3339), - "finalizers.workload.kcp.dev/us-west1": "another-controller-finalizer", + "deletion.internal.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": time.Now().Format(time.RFC3339), + "finalizers.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": "another-controller-finalizer", }, nil)), // TODO(jmprusi): Those next changes do "nothing", it's just for the test to pass // as the test expects some null fields to be there... @@ -773,12 +774,12 @@ func TestSyncerProcess(t *testing.T) { "SpecSyncer with AdvancedScheduling, sync deployment to downstream and apply SpecDiff": { upstreamLogicalCluster: "root:org:ws", fromNamespace: namespace("test", "root:org:ws", map[string]string{ - "internal.workload.kcp.dev/cluster": "us-west1", + "internal.workload.kcp.dev/cluster": "2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5", }, nil), gvr: schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}, fromResources: []runtime.Object{ secret("default-token-abc", "test", "root:org:ws", - map[string]string{"state.workload.kcp.dev/us-west1": "Sync"}, + map[string]string{"state.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": "Sync"}, map[string]string{"kubernetes.io/service-account.name": "default"}, map[string][]byte{ "token": []byte("token"), @@ -786,10 +787,10 @@ func TestSyncerProcess(t *testing.T) { }), deployment("theDeployment", "test", "root:org:ws", map[string]string{ - "state.workload.kcp.dev/us-west1": "Sync", + "state.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": "Sync", }, - map[string]string{"experimental.spec-diff.workload.kcp.dev/us-west1": "[{\"op\":\"replace\",\"path\":\"/replicas\",\"value\":3}]"}, - []string{"workload.kcp.dev/syncer-us-west1"}), + map[string]string{"experimental.spec-diff.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": "[{\"op\":\"replace\",\"path\":\"/replicas\",\"value\":3}]"}, + []string{"workload.kcp.dev/syncer-2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5"}), }, resourceToProcessLogicalClusterName: "root:org:ws", resourceToProcessName: "theDeployment", @@ -803,7 +804,7 @@ func TestSyncerProcess(t *testing.T) { changeUnstructured( toUnstructured(t, namespace("kcp-hcbsa8z6c2er", "", map[string]string{ - "internal.workload.kcp.dev/cluster": "us-west1", + "internal.workload.kcp.dev/cluster": "2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5", }, map[string]string{ "kcp.dev/namespace-locator": `{"syncTarget":{"workspace":"root:org:ws","name":"us-west1","uid":"syncTargetUID"},"workspace":"root:org:ws","namespace":"test"}`, @@ -818,8 +819,8 @@ func TestSyncerProcess(t *testing.T) { toJson(t, changeUnstructured( toUnstructured(t, deployment("theDeployment", "kcp-hcbsa8z6c2er", "", map[string]string{ - "internal.workload.kcp.dev/cluster": "us-west1", - }, map[string]string{"experimental.spec-diff.workload.kcp.dev/us-west1": "[{\"op\":\"replace\",\"path\":\"/replicas\",\"value\":3}]"}, nil)), + "internal.workload.kcp.dev/cluster": "2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5", + }, map[string]string{"experimental.spec-diff.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": "[{\"op\":\"replace\",\"path\":\"/replicas\",\"value\":3}]"}, nil)), setNestedField(map[string]interface{}{ "replicas": int64(3), }, "spec"), @@ -844,25 +845,25 @@ func TestSyncerProcess(t *testing.T) { "SpecSyncer namespace conflict: try to sync to an already existing namespace with a different namespace-locator, expect error": { upstreamLogicalCluster: "root:org:ws", fromNamespace: namespace("test", "root:org:ws", map[string]string{ - "internal.workload.kcp.dev/cluster": "us-west1", + "internal.workload.kcp.dev/cluster": "2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5", }, nil), gvr: schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}, fromResources: []runtime.Object{ secret("default-token-abc", "test", "root:org:ws", - map[string]string{"state.workload.kcp.dev/us-west1": "Sync"}, + map[string]string{"state.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": "Sync"}, map[string]string{"kubernetes.io/service-account.name": "default"}, map[string][]byte{ "token": []byte("token"), "namespace": []byte("namespace"), }), deployment("theDeployment", "test", "root:org:ws", map[string]string{ - "state.workload.kcp.dev/us-west1": "Sync", - }, nil, []string{"workload.kcp.dev/syncer-us-west1"}), + "state.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": "Sync", + }, nil, []string{"workload.kcp.dev/syncer-2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5"}), }, toResources: []runtime.Object{ namespace("kcp-hcbsa8z6c2er", "", map[string]string{ - "internal.workload.kcp.dev/cluster": "us-west1", - "state.workload.kcp.dev/us-west1": "Sync", + "internal.workload.kcp.dev/cluster": "2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5", + "state.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": "Sync", }, map[string]string{ "kcp.dev/namespace-locator": `{"syncTarget":{"workspace":"root:org:ws","name":"us-west1","uid":"syncTargetUID"},"workspace":"root:org:ws","namespace":"ANOTHERNAMESPACE"}`, }), @@ -877,25 +878,25 @@ func TestSyncerProcess(t *testing.T) { "SpecSyncer namespace conflict: try to sync to an already existing namespace without a namespace-locator, expect error": { upstreamLogicalCluster: "root:org:ws", fromNamespace: namespace("test", "root:org:ws", map[string]string{ - "state.workload.kcp.dev/us-west1": "Sync", + "state.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": "Sync", }, nil), gvr: schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}, fromResources: []runtime.Object{ secret("default-token-abc", "test", "root:org:ws", - map[string]string{"state.workload.kcp.dev/us-west1": "Sync"}, + map[string]string{"state.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": "Sync"}, map[string]string{"kubernetes.io/service-account.name": "default"}, map[string][]byte{ "token": []byte("token"), "namespace": []byte("namespace"), }), deployment("theDeployment", "test", "root:org:ws", map[string]string{ - "state.workload.kcp.dev/us-west1": "Sync", - }, nil, []string{"workload.kcp.dev/syncer-us-west1"}), + "state.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": "Sync", + }, nil, []string{"workload.kcp.dev/syncer-2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5"}), }, toResources: []runtime.Object{ namespace("kcp-hcbsa8z6c2er", "", map[string]string{ - "internal.workload.kcp.dev/cluster": "us-west1", - "state.workload.kcp.dev/us-west1": "Sync", + "internal.workload.kcp.dev/cluster": "2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5", + "state.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": "Sync", }, map[string]string{}, ), }, @@ -909,18 +910,18 @@ func TestSyncerProcess(t *testing.T) { "old v0.6.0 namespace locator exists downstream": { upstreamLogicalCluster: "root:org:ws", fromNamespace: namespace("test", "root:org:ws", map[string]string{ - "state.workload.kcp.dev/us-west1": "Sync", + "state.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": "Sync", }, nil), gvr: schema.GroupVersionResource{Group: "", Version: "v1", Resource: "secrets"}, toResources: []runtime.Object{ - namespace("kcp-0123456789", "", map[string]string{ - "state.workload.kcp.dev/us-west1": "Sync", + namespace("kcp-01c0zzvlqsi7n", "", map[string]string{ + "state.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": "Sync", }, map[string]string{ "kcp.dev/namespace-locator": `{"syncTarget":{"path":"root:org:ws","name":"us-west1","uid":"syncTargetUID"},"workspace":"root:org:ws","namespace":"test"}`, }), secret("foo", "test", "root:org:ws", - map[string]string{"state.workload.kcp.dev/us-west1": "Sync"}, + map[string]string{"state.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": "Sync"}, nil, map[string][]byte{ "a": []byte("b"), @@ -929,10 +930,10 @@ func TestSyncerProcess(t *testing.T) { fromResources: []runtime.Object{ secretWithFinalizers("foo", "test", "root:org:ws", map[string]string{ - "state.workload.kcp.dev/us-west1": "Sync", - "something": "else"}, + "state.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": "Sync", + "something": "else"}, nil, - []string{"workload.kcp.dev/syncer-us-west1"}, + []string{"workload.kcp.dev/syncer-2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5"}, map[string][]byte{ "a": []byte("b"), }), @@ -945,9 +946,9 @@ func TestSyncerProcess(t *testing.T) { expectActionsOnTo: []clienttesting.Action{ patchSecretAction( "foo", - "kcp-0123456789", + "kcp-01c0zzvlqsi7n", types.ApplyPatchType, - []byte(`{"apiVersion":"v1","data":{"a":"Yg=="},"kind":"Secret","metadata":{"creationTimestamp":null,"labels":{"internal.workload.kcp.dev/cluster":"us-west1","something":"else"},"name":"foo","namespace":"kcp-0123456789"},"type":"kubernetes.io/service-account-token"}`), + []byte(`{"apiVersion":"v1","data":{"a":"Yg=="},"kind":"Secret","metadata":{"creationTimestamp":null,"labels":{"internal.workload.kcp.dev/cluster":"2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5","something":"else"},"name":"foo","namespace":"kcp-01c0zzvlqsi7n"},"type":"kubernetes.io/service-account-token"}`), ), }, }, @@ -964,6 +965,10 @@ func TestSyncerProcess(t *testing.T) { syncTargetUID = types.UID("syncTargetUID") } + if tc.syncTargetWorkspace.Empty() { + tc.syncTargetWorkspace = logicalcluster.New("root:org:ws") + } + var allFromResources []runtime.Object allFromResources = append(allFromResources, tc.fromNamespace) if tc.fromResources != nil { @@ -975,13 +980,14 @@ func TestSyncerProcess(t *testing.T) { client: fromClient, } - toClient := dynamicfake.NewSimpleDynamicClient(scheme, tc.toResources...) + syncTargetKey := workloadv1alpha1.ToSyncTargetKey(tc.syncTargetWorkspace, tc.syncTargetName) + toClient := dynamicfake.NewSimpleDynamicClient(scheme, tc.toResources...) fromInformers := dynamicinformer.NewFilteredDynamicSharedInformerFactory(fromClusterClient.Cluster(logicalcluster.Wildcard), time.Hour, metav1.NamespaceAll, func(o *metav1.ListOptions) { - o.LabelSelector = workloadv1alpha1.ClusterResourceStateLabelPrefix + tc.syncTargetName + "=" + string(workloadv1alpha1.ResourceStateSync) + o.LabelSelector = workloadv1alpha1.ClusterResourceStateLabelPrefix + syncTargetKey + "=" + string(workloadv1alpha1.ResourceStateSync) }) toInformers := dynamicinformer.NewFilteredDynamicSharedInformerFactory(toClient, time.Hour, metav1.NamespaceAll, func(o *metav1.ListOptions) { - o.LabelSelector = workloadv1alpha1.ClusterResourceStateLabelPrefix + tc.syncTargetName + "=" + string(workloadv1alpha1.ResourceStateSync) + o.LabelSelector = workloadv1alpha1.ClusterResourceStateLabelPrefix + syncTargetKey + "=" + string(workloadv1alpha1.ResourceStateSync) }) setupServersideApplyPatchReactor(toClient) @@ -995,7 +1001,7 @@ func TestSyncerProcess(t *testing.T) { } upstreamURL, err := url.Parse("https://kcp.dev:6443") require.NoError(t, err) - controller, err := NewSpecSyncer(gvrs, kcpLogicalCluster, tc.syncTargetName, upstreamURL, tc.advancedSchedulingEnabled, fromClusterClient, toClient, fromInformers, toInformers, syncTargetUID) + controller, err := NewSpecSyncer(gvrs, kcpLogicalCluster, tc.syncTargetName, syncTargetKey, upstreamURL, tc.advancedSchedulingEnabled, fromClusterClient, toClient, fromInformers, toInformers, syncTargetUID) require.NoError(t, err) fromInformers.Start(ctx.Done()) diff --git a/pkg/syncer/status/status_controller.go b/pkg/syncer/status/status_controller.go index a605d0bfb42..02aef00d102 100644 --- a/pkg/syncer/status/status_controller.go +++ b/pkg/syncer/status/status_controller.go @@ -52,10 +52,11 @@ type Controller struct { syncTargetName string syncTargetWorkspace logicalcluster.Name syncTargetUID types.UID + syncTargetKey string advancedSchedulingEnabled bool } -func NewStatusSyncer(gvrs []schema.GroupVersionResource, syncTargetWorkspace logicalcluster.Name, syncTargetName string, advancedSchedulingEnabled bool, +func NewStatusSyncer(gvrs []schema.GroupVersionResource, syncTargetWorkspace logicalcluster.Name, syncTargetName, syncTargetKey string, advancedSchedulingEnabled bool, upstreamClient dynamic.ClusterInterface, downstreamClient dynamic.Interface, upstreamInformers, downstreamInformers dynamicinformer.DynamicSharedInformerFactory, syncTargetUID types.UID) (*Controller, error) { c := &Controller{ @@ -70,6 +71,7 @@ func NewStatusSyncer(gvrs []schema.GroupVersionResource, syncTargetWorkspace log syncTargetName: syncTargetName, syncTargetWorkspace: syncTargetWorkspace, syncTargetUID: syncTargetUID, + syncTargetKey: syncTargetKey, advancedSchedulingEnabled: advancedSchedulingEnabled, } diff --git a/pkg/syncer/status/status_process.go b/pkg/syncer/status/status_process.go index 60a627f8dab..c00628fd1d4 100644 --- a/pkg/syncer/status/status_process.go +++ b/pkg/syncer/status/status_process.go @@ -101,7 +101,7 @@ func (c *Controller) process(ctx context.Context, gvr schema.GroupVersionResourc } if !exists { klog.InfoS("Downstream GVR %q object %s|%s/%s does not exist. Removing finalizer upstream", gvr.String(), downstreamClusterName, upstreamNamespace, name) - return shared.EnsureUpstreamFinalizerRemoved(ctx, gvr, c.upstreamInformers, c.upstreamClient, upstreamNamespace, c.syncTargetName, upstreamWorkspace, name) + return shared.EnsureUpstreamFinalizerRemoved(ctx, gvr, c.upstreamInformers, c.upstreamClient, upstreamNamespace, c.syncTargetKey, upstreamWorkspace, name) } // update upstream status @@ -146,7 +146,7 @@ func (c *Controller) updateStatusInUpstream(ctx context.Context, gvr schema.Grou if newUpstreamAnnotations == nil { newUpstreamAnnotations = make(map[string]string) } - newUpstreamAnnotations[workloadv1alpha1.InternalClusterStatusAnnotationPrefix+c.syncTargetName] = string(statusAnnotationValue) + newUpstreamAnnotations[workloadv1alpha1.InternalClusterStatusAnnotationPrefix+c.syncTargetKey] = string(statusAnnotationValue) newUpstream.SetAnnotations(newUpstreamAnnotations) if reflect.DeepEqual(existing, newUpstream) { diff --git a/pkg/syncer/status/status_process_test.go b/pkg/syncer/status/status_process_test.go index 905f81a2695..a4f4927c68f 100644 --- a/pkg/syncer/status/status_process_test.go +++ b/pkg/syncer/status/status_process_test.go @@ -313,6 +313,7 @@ func TestSyncerProcess(t *testing.T) { upstreamURL string upstreamLogicalCluster string syncTargetName string + syncTargetWorkspace logicalcluster.Name syncTargetUID types.UID advancedSchedulingEnabled bool @@ -324,7 +325,7 @@ func TestSyncerProcess(t *testing.T) { upstreamLogicalCluster: "root:org:ws", fromNamespace: namespace("kcp0124d7647eb6a00b1fcb6f2252201601634989dd79deb7375c373973", "", map[string]string{ - "internal.workload.kcp.dev/cluster": "us-west1", + "internal.workload.kcp.dev/cluster": "2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5", }, map[string]string{ "kcp.dev/namespace-locator": `{"workspace":"root:org:ws","namespace":"test"}`, @@ -332,14 +333,14 @@ func TestSyncerProcess(t *testing.T) { gvr: schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}, fromResource: changeDeployment( deployment("theDeployment", "kcp0124d7647eb6a00b1fcb6f2252201601634989dd79deb7375c373973", "", map[string]string{ - "internal.workload.kcp.dev/cluster": "us-west1", + "internal.workload.kcp.dev/cluster": "2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5", }, nil, nil), addDeploymentStatus(appsv1.DeploymentStatus{ Replicas: 15, })), toResources: []runtime.Object{ deployment("theDeployment", "test", "root:org:ws", map[string]string{ - "state.workload.kcp.dev/us-west1": "Sync", + "state.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": "Sync", }, nil, nil), }, resourceToProcessLogicalClusterName: "", @@ -351,7 +352,7 @@ func TestSyncerProcess(t *testing.T) { updateDeploymentAction("test", toUnstructured(t, changeDeployment( deployment("theDeployment", "test", "root:org:ws", map[string]string{ - "state.workload.kcp.dev/us-west1": "Sync", + "state.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": "Sync", }, nil, nil), addDeploymentStatus(appsv1.DeploymentStatus{ Replicas: 15, @@ -363,7 +364,7 @@ func TestSyncerProcess(t *testing.T) { upstreamLogicalCluster: "root:org:ws", fromNamespace: namespace("kcp0124d7647eb6a00b1fcb6f2252201601634989dd79deb7375c373973", "", map[string]string{ - "internal.workload.kcp.dev/cluster": "us-west1", + "internal.workload.kcp.dev/cluster": "2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5", }, map[string]string{ "kcp.dev/namespace-locator": `{"workspace":"root:org:ws","namespace":"test"}`, @@ -376,7 +377,7 @@ func TestSyncerProcess(t *testing.T) { })), toResources: []runtime.Object{ deployment("theDeployment", "test", "root:org:ws", map[string]string{ - "state.workload.kcp.dev/us-west1": "Sync", + "state.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": "Sync", }, nil, nil), }, resourceToProcessLogicalClusterName: "", @@ -390,7 +391,7 @@ func TestSyncerProcess(t *testing.T) { upstreamLogicalCluster: "root:org:ws", fromNamespace: namespace("kcp0124d7647eb6a00b1fcb6f2252201601634989dd79deb7375c373973", "", map[string]string{ - "internal.workload.kcp.dev/cluster": "us-west1", + "internal.workload.kcp.dev/cluster": "2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5", }, map[string]string{ "kcp.dev/namespace-locator": `{"workspace":"root:org:ws","namespace":"test"}`, @@ -398,14 +399,14 @@ func TestSyncerProcess(t *testing.T) { gvr: schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}, fromResource: changeDeployment( deployment("theDeployment", "kcp0124d7647eb6a00b1fcb6f2252201601634989dd79deb7375c373973", "", map[string]string{ - "internal.workload.kcp.dev/cluster": "us-west1", + "internal.workload.kcp.dev/cluster": "2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5", }, nil, nil), addDeploymentStatus(appsv1.DeploymentStatus{ Replicas: 15, })), toResources: []runtime.Object{ deployment("theDeployment", "test", "root:org:ws", map[string]string{ - "state.workload.kcp.dev/us-west1": "Sync", + "state.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": "Sync", }, nil, nil), }, resourceToProcessLogicalClusterName: "", @@ -418,9 +419,9 @@ func TestSyncerProcess(t *testing.T) { updateDeploymentAction("test", toUnstructured(t, changeDeployment( deployment("theDeployment", "test", "root:org:ws", map[string]string{ - "state.workload.kcp.dev/us-west1": "Sync", + "state.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": "Sync", }, map[string]string{ - "experimental.status.workload.kcp.dev/us-west1": "{\"replicas\":15}", + "experimental.status.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": "{\"replicas\":15}", }, nil)))), }, }, @@ -428,7 +429,7 @@ func TestSyncerProcess(t *testing.T) { upstreamLogicalCluster: "root:org:ws", fromNamespace: namespace("kcp0124d7647eb6a00b1fcb6f2252201601634989dd79deb7375c373973", "", map[string]string{ - "internal.workload.kcp.dev/cluster": "us-west1", + "internal.workload.kcp.dev/cluster": "2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5", }, map[string]string{ "kcp.dev/namespace-locator": `{"workspace":"root:org:ws","namespace":"test"}`, @@ -436,18 +437,18 @@ func TestSyncerProcess(t *testing.T) { gvr: schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}, fromResource: changeDeployment( deployment("theDeployment", "kcp0124d7647eb6a00b1fcb6f2252201601634989dd79deb7375c373973", "", map[string]string{ - "internal.workload.kcp.dev/cluster": "us-west1", + "internal.workload.kcp.dev/cluster": "2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5", }, nil, nil), addDeploymentStatus(appsv1.DeploymentStatus{ Replicas: 15, })), toResources: []runtime.Object{ deployment("theDeployment", "test", "root:org:ws", map[string]string{ - "state.workload.kcp.dev/us-west1": "Sync", + "state.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": "Sync", }, map[string]string{ - "deletion.internal.workload.kcp.dev/us-west1": time.Now().Format(time.RFC3339), - "experimental.status.workload.kcp.dev/us-west1": "{\"replicas\":15}", - }, []string{"workload.kcp.dev/syncer-us-west1"}), + "deletion.internal.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": time.Now().Format(time.RFC3339), + "experimental.status.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": "{\"replicas\":15}", + }, []string{"workload.kcp.dev/syncer-2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5"}), }, resourceToProcessLogicalClusterName: "", resourceToProcessName: "theDeployment", @@ -461,7 +462,7 @@ func TestSyncerProcess(t *testing.T) { upstreamLogicalCluster: "root:org:ws", fromNamespace: namespace("kcp0124d7647eb6a00b1fcb6f2252201601634989dd79deb7375c373973", "", map[string]string{ - "internal.workload.kcp.dev/cluster": "us-west1", + "internal.workload.kcp.dev/cluster": "2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5", }, map[string]string{ "kcp.dev/namespace-locator": `{"workspace":"root:org:ws","namespace":"test"}`, @@ -469,13 +470,13 @@ func TestSyncerProcess(t *testing.T) { gvr: schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}, fromResource: nil, toResources: []runtime.Object{ - namespace("test", "root:org:ws", map[string]string{"state.workload.kcp.dev/us-west1": "Sync"}, nil), + namespace("test", "root:org:ws", map[string]string{"state.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": "Sync"}, nil), deployment("theDeployment", "test", "root:org:ws", map[string]string{ - "state.workload.kcp.dev/us-west1": "Sync", + "state.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": "Sync", }, map[string]string{ - "deletion.internal.workload.kcp.dev/us-west1": time.Now().Format(time.RFC3339), - "experimental.status.workload.kcp.dev/us-west1": `{"replicas":15}`, - }, []string{"workload.kcp.dev/syncer-us-west1"}), + "deletion.internal.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": time.Now().Format(time.RFC3339), + "experimental.status.workload.kcp.dev/2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5": `{"replicas":15}`, + }, []string{"workload.kcp.dev/syncer-2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5"}), }, resourceToProcessLogicalClusterName: "", resourceToProcessName: "theDeployment", @@ -506,9 +507,11 @@ func TestSyncerProcess(t *testing.T) { defer cancel() kcpLogicalCluster := logicalcluster.New(tc.upstreamLogicalCluster) - syncTargetUID := tc.syncTargetUID if tc.syncTargetUID == "" { - syncTargetUID = types.UID("syncTargetUID") + tc.syncTargetUID = types.UID("syncTargetUID") + } + if tc.syncTargetWorkspace.Empty() { + tc.syncTargetWorkspace = logicalcluster.New("root:org:ws") } var allFromResources []runtime.Object @@ -522,11 +525,12 @@ func TestSyncerProcess(t *testing.T) { client: toClient, } + syncTargetKey := workloadv1alpha1.ToSyncTargetKey(tc.syncTargetWorkspace, tc.syncTargetName) fromInformers := dynamicinformer.NewFilteredDynamicSharedInformerFactory(fromClient, time.Hour, metav1.NamespaceAll, func(o *metav1.ListOptions) { - o.LabelSelector = workloadv1alpha1.InternalDownstreamClusterLabel + "=" + tc.syncTargetName + o.LabelSelector = workloadv1alpha1.InternalDownstreamClusterLabel + "=" + syncTargetKey }) toInformers := dynamicinformer.NewFilteredDynamicSharedInformerFactory(toClusterClient.Cluster(logicalcluster.Wildcard), time.Hour, metav1.NamespaceAll, func(o *metav1.ListOptions) { - o.LabelSelector = workloadv1alpha1.ClusterResourceStateLabelPrefix + tc.syncTargetName + "=" + string(workloadv1alpha1.ResourceStateSync) + o.LabelSelector = workloadv1alpha1.ClusterResourceStateLabelPrefix + syncTargetKey + "=" + string(workloadv1alpha1.ResourceStateSync) }) setupServersideApplyPatchReactor(toClient) @@ -538,7 +542,7 @@ func TestSyncerProcess(t *testing.T) { {Group: "", Version: "v1", Resource: "namespaces"}, tc.gvr, } - controller, err := NewStatusSyncer(gvrs, kcpLogicalCluster, tc.syncTargetName, tc.advancedSchedulingEnabled, toClusterClient, fromClient, toInformers, fromInformers, syncTargetUID) + controller, err := NewStatusSyncer(gvrs, kcpLogicalCluster, tc.syncTargetName, syncTargetKey, tc.advancedSchedulingEnabled, toClusterClient, fromClient, toInformers, fromInformers, tc.syncTargetUID) require.NoError(t, err) toInformers.ForResource(tc.gvr).Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{}) diff --git a/pkg/syncer/syncer.go b/pkg/syncer/syncer.go index fc59c7b1ba0..79451d0e7ae 100644 --- a/pkg/syncer/syncer.go +++ b/pkg/syncer/syncer.go @@ -143,11 +143,12 @@ func StartSyncer(ctx context.Context, cfg *SyncerConfig, numSyncerThreads int, i } upstreamDiscoveryClient := upstreamDiscoveryClusterClient.WithCluster(logicalcluster.Wildcard) + syncTargetKey := workloadv1alpha1.ToSyncTargetKey(cfg.SyncTargetWorkspace, cfg.SyncTargetName) upstreamInformers := dynamicinformer.NewFilteredDynamicSharedInformerFactory(upstreamDynamicClusterClient.Cluster(logicalcluster.Wildcard), resyncPeriod, metav1.NamespaceAll, func(o *metav1.ListOptions) { - o.LabelSelector = workloadv1alpha1.ClusterResourceStateLabelPrefix + cfg.SyncTargetName + "=" + string(workloadv1alpha1.ResourceStateSync) + o.LabelSelector = workloadv1alpha1.ClusterResourceStateLabelPrefix + syncTargetKey + "=" + string(workloadv1alpha1.ResourceStateSync) }) downstreamInformers := dynamicinformer.NewFilteredDynamicSharedInformerFactoryWithOptions(downstreamDynamicClient, metav1.NamespaceAll, func(o *metav1.ListOptions) { - o.LabelSelector = workloadv1alpha1.InternalDownstreamClusterLabel + "=" + cfg.SyncTargetName + o.LabelSelector = workloadv1alpha1.InternalDownstreamClusterLabel + "=" + syncTargetKey }, cache.WithResyncPeriod(resyncPeriod), cache.WithKeyFunction(keyfunctions.DeletionHandlingMetaNamespaceKeyFunc)) // TODO(ncdc): we need to provide user-facing details if this polling goes on forever. Blocking here is a bad UX. @@ -189,14 +190,14 @@ func StartSyncer(ctx context.Context, cfg *SyncerConfig, numSyncerThreads int, i if err != nil { return err } - specSyncer, err := spec.NewSpecSyncer(gvrs, cfg.SyncTargetWorkspace, cfg.SyncTargetName, upstreamURL, advancedSchedulingEnabled, + specSyncer, err := spec.NewSpecSyncer(gvrs, cfg.SyncTargetWorkspace, cfg.SyncTargetName, syncTargetKey, upstreamURL, advancedSchedulingEnabled, upstreamDynamicClusterClient, downstreamDynamicClient, upstreamInformers, downstreamInformers, syncTarget.GetUID()) if err != nil { return err } klog.Infof("Creating status syncer for SyncTarget %s|%s, resources %v", cfg.SyncTargetWorkspace, cfg.SyncTargetName, resources) - statusSyncer, err := status.NewStatusSyncer(gvrs, cfg.SyncTargetWorkspace, cfg.SyncTargetName, advancedSchedulingEnabled, + statusSyncer, err := status.NewStatusSyncer(gvrs, cfg.SyncTargetWorkspace, cfg.SyncTargetName, syncTargetKey, advancedSchedulingEnabled, upstreamDynamicClusterClient, downstreamDynamicClient, upstreamInformers, downstreamInformers, syncTarget.GetUID()) if err != nil { return err diff --git a/pkg/virtual/syncer/builder/build.go b/pkg/virtual/syncer/builder/build.go index 934ae45ae8e..36caa34b2c5 100644 --- a/pkg/virtual/syncer/builder/build.go +++ b/pkg/virtual/syncer/builder/build.go @@ -157,9 +157,10 @@ func BuildVirtualWorkspace( wildcardKcpInformers.Workload().V1alpha1().SyncTargets(), wildcardKcpInformers.Apis().V1alpha1().APIResourceSchemas(), wildcardKcpInformers.Apis().V1alpha1().APIExports(), - func(syncTargetName string, apiResourceSchema *apisv1alpha1.APIResourceSchema, version string, apiExportIdentityHash string) (apidefinition.APIDefinition, error) { + func(syncTargetWorkspace logicalcluster.Name, syncTargetName string, apiResourceSchema *apisv1alpha1.APIResourceSchema, version string, apiExportIdentityHash string) (apidefinition.APIDefinition, error) { + syncTargetKey := workloadv1alpha1.ToSyncTargetKey(syncTargetWorkspace, syncTargetName) requirements, selectable := labels.SelectorFromSet(map[string]string{ - workloadv1alpha1.ClusterResourceStateLabelPrefix + syncTargetName: string(workloadv1alpha1.ResourceStateSync), + workloadv1alpha1.ClusterResourceStateLabelPrefix + syncTargetKey: string(workloadv1alpha1.ResourceStateSync), }).Requirements() if !selectable { return nil, fmt.Errorf("unable to create a selector from the provided labels") diff --git a/pkg/virtual/syncer/controllers/apireconciler/syncer_apireconciler_controller.go b/pkg/virtual/syncer/controllers/apireconciler/syncer_apireconciler_controller.go index 3076a4064a5..3e23f07f79d 100644 --- a/pkg/virtual/syncer/controllers/apireconciler/syncer_apireconciler_controller.go +++ b/pkg/virtual/syncer/controllers/apireconciler/syncer_apireconciler_controller.go @@ -22,6 +22,8 @@ import ( "sync" "time" + "github.com/kcp-dev/logicalcluster/v2" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/runtime" @@ -48,7 +50,7 @@ const ( byWorkspace = ControllerName + "-byWorkspace" // will go away with scoping ) -type CreateAPIDefinitionFunc func(syncTargetName string, apiResourceSchema *apisv1alpha1.APIResourceSchema, version string, identityHash string) (apidefinition.APIDefinition, error) +type CreateAPIDefinitionFunc func(syncTargetWorkspace logicalcluster.Name, syncTargetName string, apiResourceSchema *apisv1alpha1.APIResourceSchema, version string, identityHash string) (apidefinition.APIDefinition, error) func NewAPIReconciler( kcpClusterClient kcpclient.ClusterInterface, @@ -289,7 +291,7 @@ func (c *APIReconciler) process(ctx context.Context, key string) error { for _, obj := range cs { cluster := obj.(*workloadv1alpha1.SyncTarget) apiDomainKey := dynamiccontext.APIDomainKey(clusters.ToClusterAwareKey(clusterName, cluster.Name)) - if err := c.reconcile(ctx, apiExport, apiDomainKey, cluster.Name); err != nil { + if err := c.reconcile(ctx, apiExport, apiDomainKey, logicalcluster.From(cluster), cluster.Name); err != nil { errs = append(errs, err) } } diff --git a/pkg/virtual/syncer/controllers/apireconciler/syncer_apireconciler_reconcile.go b/pkg/virtual/syncer/controllers/apireconciler/syncer_apireconciler_reconcile.go index 307e1d564a7..7a873d7f10a 100644 --- a/pkg/virtual/syncer/controllers/apireconciler/syncer_apireconciler_reconcile.go +++ b/pkg/virtual/syncer/controllers/apireconciler/syncer_apireconciler_reconcile.go @@ -34,7 +34,7 @@ import ( "github.com/kcp-dev/kcp/pkg/virtual/framework/internalapis" ) -func (c *APIReconciler) reconcile(ctx context.Context, apiExport *apisv1alpha1.APIExport, apiDomainKey dynamiccontext.APIDomainKey, syncTargetName string) error { +func (c *APIReconciler) reconcile(ctx context.Context, apiExport *apisv1alpha1.APIExport, apiDomainKey dynamiccontext.APIDomainKey, syncTargetWorkspace logicalcluster.Name, syncTargetName string) error { if apiExport == nil || apiExport.Status.IdentityHash == "" { // new APIExport that is not ready yet, or export got deleted @@ -114,7 +114,7 @@ func (c *APIReconciler) reconcile(ctx context.Context, apiExport *apisv1alpha1.A } } - apiDefinition, err := c.createAPIDefinition(syncTargetName, apiResourceSchema, version.Name, schemaIdentity[apiResourceSchema.Name]) + apiDefinition, err := c.createAPIDefinition(syncTargetWorkspace, syncTargetName, apiResourceSchema, version.Name, schemaIdentity[apiResourceSchema.Name]) if err != nil { klog.Errorf("failed to create API definition for %s: %v", gvr, err) continue diff --git a/test/e2e/reconciler/cluster/controller_test.go b/test/e2e/reconciler/cluster/controller_test.go index 95b49ab9d89..108f898284c 100644 --- a/test/e2e/reconciler/cluster/controller_test.go +++ b/test/e2e/reconciler/cluster/controller_test.go @@ -66,28 +66,29 @@ func TestClusterController(t *testing.T) { { name: "create an object, expect spec and status to sync to sink, then delete", work: func(ctx context.Context, t *testing.T, servers map[string]runningServer, syncerFixture *framework.StartedSyncerFixture) { + kcpClient, err := kcpclientset.NewForConfig(syncerFixture.SyncerConfig.UpstreamConfig) + require.NoError(t, err) + + syncTarget, err := kcpClient.WorkloadV1alpha1().SyncTargets().Get(ctx, + syncerFixture.SyncerConfig.SyncTargetName, + metav1.GetOptions{}, + ) + require.NoError(t, err) + syncTargetKey := workloadv1alpha1.ToSyncTargetKey(logicalcluster.From(syncTarget), syncTarget.GetName()) + t.Logf("Creating cowboy timothy") cowboy, err := servers[sourceClusterName].client.Cowboys(testNamespace).Create(ctx, &wildwestv1alpha1.Cowboy{ ObjectMeta: metav1.ObjectMeta{ Name: "timothy", Labels: map[string]string{ - "state.workload.kcp.dev/" + sinkClusterName: string(workloadv1alpha1.ResourceStateSync), + "state.workload.kcp.dev/" + syncTargetKey: string(workloadv1alpha1.ResourceStateSync), }, }, Spec: wildwestv1alpha1.CowboySpec{Intent: "yeehaw"}, }, metav1.CreateOptions{}) require.NoError(t, err, "failed to create cowboy") - kcpClient, err := kcpclientset.NewForConfig(syncerFixture.SyncerConfig.UpstreamConfig) - require.NoError(t, err) - - syncTarget, err := kcpClient.WorkloadV1alpha1().SyncTargets().Get(ctx, - syncerFixture.SyncerConfig.SyncTargetName, - metav1.GetOptions{}, - ) - require.NoError(t, err) - - nsLocator := shared.NewNamespaceLocator(syncerFixture.SyncerConfig.SyncTargetWorkspace, logicalcluster.From(syncTarget), syncTarget.GetUID(), syncTarget.GetName(), cowboy.Namespace) + nsLocator := shared.NewNamespaceLocator(logicalcluster.From(cowboy), logicalcluster.From(syncTarget), syncTarget.GetUID(), syncTarget.GetName(), cowboy.Namespace) targetNamespace, err := shared.PhysicalClusterNamespaceName(nsLocator) require.NoError(t, err, "Error determining namespace mapping for %v", nsLocator) diff --git a/test/e2e/reconciler/ingress/controller_test.go b/test/e2e/reconciler/ingress/controller_test.go index 8ecafe40961..89e2c633ed6 100644 --- a/test/e2e/reconciler/ingress/controller_test.go +++ b/test/e2e/reconciler/ingress/controller_test.go @@ -179,6 +179,8 @@ func TestIngressController(t *testing.T) { }), ).Start(t) + syncerTargetKey := workloadv1alpha1.ToSyncTargetKey(syncerFixture.SyncerConfig.SyncTargetWorkspace, syncerFixture.SyncerConfig.SyncTargetName) + t.Log("Wait for \"kubernetes\" apiexport") require.Eventually(t, func() bool { _, err := sourceKcpClient.ApisV1alpha1().APIExports().Get(ctx, "kubernetes", metav1.GetOptions{}) @@ -282,7 +284,7 @@ func TestIngressController(t *testing.T) { klog.Error(err) return false } - return ns.Labels[workloadv1alpha1.ClusterResourceStateLabelPrefix+syncerFixture.SyncerConfig.SyncTargetName] != "" + return ns.Labels[workloadv1alpha1.ClusterResourceStateLabelPrefix+syncerTargetKey] != "" }, wait.ForeverTestTimeout, time.Millisecond*100) t.Log("Creating service in source cluster") @@ -314,7 +316,7 @@ func TestIngressController(t *testing.T) { klog.Error(err) return false } - return ns.Labels[workloadv1alpha1.ClusterResourceStateLabelPrefix+syncerFixture.SyncerConfig.SyncTargetName] != "" + return ns.Labels[workloadv1alpha1.ClusterResourceStateLabelPrefix+syncerTargetKey] != "" }, wait.ForeverTestTimeout, time.Millisecond*100) t.Log("Starting ingress-controller...") diff --git a/test/e2e/reconciler/namespace/controller_test.go b/test/e2e/reconciler/namespace/controller_test.go index 546c217443b..fa693bebbef 100644 --- a/test/e2e/reconciler/namespace/controller_test.go +++ b/test/e2e/reconciler/namespace/controller_test.go @@ -109,6 +109,7 @@ func TestNamespaceScheduler(t *testing.T) { } return binding.Status.Phase == apisv1alpha1.APIBindingPhaseBound }, wait.ForeverTestTimeout, time.Millisecond*100) + syncTargetKey := workloadv1alpha1.ToSyncTargetKey(syncerFixture.SyncerConfig.SyncTargetWorkspace, syncerFixture.SyncerConfig.SyncTargetName) t.Log("Wait until the namespace is scheduled to the workload cluster") require.Eventually(t, func() bool { @@ -117,7 +118,7 @@ func TestNamespaceScheduler(t *testing.T) { klog.Error(err) return false } - return scheduledMatcher(syncTargetName)(ns) == nil + return scheduledMatcher(syncTargetKey)(ns) == nil }, wait.ForeverTestTimeout, time.Second) t.Log("Cordon the cluster and expect the namespace to end up unschedulable") @@ -156,6 +157,7 @@ func TestNamespaceScheduler(t *testing.T) { } cluster, err = server.kcpClient.WorkloadV1alpha1().SyncTargets().Create(ctx, cluster, metav1.CreateOptions{}) require.NoError(t, err, "failed to create cluster") + syncTargetKey := workloadv1alpha1.ToSyncTargetKey(logicalcluster.From(cluster), cluster.Name) go wait.UntilWithContext(ctx, func(ctx context.Context) { patchBytes := []byte(fmt.Sprintf(`[{"op":"replace","path":"/status/lastSyncerHeartbeatTime","value":%q}]`, time.Now().Format(time.RFC3339))) @@ -192,7 +194,7 @@ func TestNamespaceScheduler(t *testing.T) { klog.Errorf("failed to get sheriff: %v", err) return false } - return obj.GetLabels()[workloadv1alpha1.ClusterResourceStateLabelPrefix+cluster.Name] != "" + return obj.GetLabels()[workloadv1alpha1.ClusterResourceStateLabelPrefix+syncTargetKey] != "" }, wait.ForeverTestTimeout, time.Millisecond*100, "failed to see sheriff scheduled") t.Log("Delete the sheriff and the sheriff CRD") @@ -218,7 +220,7 @@ func TestNamespaceScheduler(t *testing.T) { klog.Errorf("failed to get sheriff: %v", err) return false } - return obj.GetLabels()[workloadv1alpha1.ClusterResourceStateLabelPrefix+cluster.Name] != "" + return obj.GetLabels()[workloadv1alpha1.ClusterResourceStateLabelPrefix+syncTargetKey] != "" }, wait.ForeverTestTimeout, time.Millisecond*100, "failed to see sheriff scheduled") }, }, diff --git a/test/e2e/reconciler/scheduling/controller_test.go b/test/e2e/reconciler/scheduling/controller_test.go index 9ec36d4cc33..a8f08acae9d 100644 --- a/test/e2e/reconciler/scheduling/controller_test.go +++ b/test/e2e/reconciler/scheduling/controller_test.go @@ -39,6 +39,7 @@ import ( apisv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apis/v1alpha1" schedulingv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/scheduling/v1alpha1" "github.com/kcp-dev/kcp/pkg/apis/third_party/conditions/util/conditions" + workloadv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/workload/v1alpha1" kcpclient "github.com/kcp-dev/kcp/pkg/client/clientset/versioned" kubefixtures "github.com/kcp-dev/kcp/test/e2e/fixtures/kube" "github.com/kcp-dev/kcp/test/e2e/framework" @@ -256,12 +257,14 @@ func TestScheduling(t *testing.T) { return true }, wait.ForeverTestTimeout, time.Millisecond*100) + syncTargetKey := workloadv1alpha1.ToSyncTargetKey(syncerFixture.SyncerConfig.SyncTargetWorkspace, syncTargetName) + t.Logf("Create a service in the user workspace") _, err = kubeClusterClient.Cluster(userClusterName).CoreV1().Services("default").Create(ctx, &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: "first", Labels: map[string]string{ - "state.workload.kcp.dev/" + syncTargetName: "Sync", + "state.workload.kcp.dev/" + syncTargetKey: "Sync", }, }, Spec: corev1.ServiceSpec{ @@ -280,7 +283,7 @@ func TestScheduling(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "second", Labels: map[string]string{ - "state.workload.kcp.dev/" + syncTargetName: "Sync", + "state.workload.kcp.dev/" + syncTargetKey: "Sync", }, }, Spec: corev1.ServiceSpec{ @@ -298,7 +301,7 @@ func TestScheduling(t *testing.T) { var downstreamServices *corev1.ServiceList require.Eventually(t, func() bool { downstreamServices, err = syncerFixture.DownstreamKubeClient.CoreV1().Services("").List(ctx, metav1.ListOptions{ - LabelSelector: "internal.workload.kcp.dev/cluster=" + syncTargetName, + LabelSelector: "internal.workload.kcp.dev/cluster=" + syncTargetKey, }) if errors.IsNotFound(err) { return false diff --git a/test/e2e/reconciler/scheduling/multi_placements_test.go b/test/e2e/reconciler/scheduling/multi_placements_test.go index cca306412cd..efafb0410ae 100644 --- a/test/e2e/reconciler/scheduling/multi_placements_test.go +++ b/test/e2e/reconciler/scheduling/multi_placements_test.go @@ -273,11 +273,11 @@ func TestMultiPlacement(t *testing.T) { return false, fmt.Sprintf("Failed to get service: %v", err) } - if svc.Labels[workloadv1alpha1.ClusterResourceStateLabelPrefix+firstSyncTargetName] != string(workloadv1alpha1.ResourceStateSync) { + if svc.Labels[workloadv1alpha1.ClusterResourceStateLabelPrefix+workloadv1alpha1.ToSyncTargetKey(firstSyncerFixture.SyncerConfig.SyncTargetWorkspace, firstSyncTargetName)] != string(workloadv1alpha1.ResourceStateSync) { return false, fmt.Sprintf("%s is not added to ns annotation", firstSyncTargetName) } - if svc.Labels[workloadv1alpha1.ClusterResourceStateLabelPrefix+secondSyncTargetName] != string(workloadv1alpha1.ResourceStateSync) { + if svc.Labels[workloadv1alpha1.ClusterResourceStateLabelPrefix+workloadv1alpha1.ToSyncTargetKey(secondSyncerFixture.SyncerConfig.SyncTargetWorkspace, secondSyncTargetName)] != string(workloadv1alpha1.ResourceStateSync) { return false, fmt.Sprintf("%s is not added to ns annotation", secondSyncTargetName) } diff --git a/test/e2e/reconciler/scheduling/placement_scheduler_test.go b/test/e2e/reconciler/scheduling/placement_scheduler_test.go index c68b318145f..f7d158df38b 100644 --- a/test/e2e/reconciler/scheduling/placement_scheduler_test.go +++ b/test/e2e/reconciler/scheduling/placement_scheduler_test.go @@ -154,6 +154,7 @@ func TestPlacementUpdate(t *testing.T) { }, }, metav1.CreateOptions{}) require.NoError(t, err) + firstSyncTargetKey := workloadv1alpha1.ToSyncTargetKey(syncerFixture.SyncerConfig.SyncTargetWorkspace, firstSyncTargetName) t.Logf("Wait for the service to have the sync label") framework.Eventually(t, func() (bool, string) { @@ -162,7 +163,7 @@ func TestPlacementUpdate(t *testing.T) { return false, fmt.Sprintf("Failed to get service: %v", err) } - return svc.Labels[workloadv1alpha1.ClusterResourceStateLabelPrefix+firstSyncTargetName] == string(workloadv1alpha1.ResourceStateSync), "" + return svc.Labels[workloadv1alpha1.ClusterResourceStateLabelPrefix+firstSyncTargetKey] == string(workloadv1alpha1.ResourceStateSync), "" }, wait.ForeverTestTimeout, time.Millisecond*100) t.Logf("Wait for the service to be sync to the downstream cluster") @@ -214,7 +215,7 @@ func TestPlacementUpdate(t *testing.T) { return false, fmt.Sprintf("Failed to get ns: %v", err) } - if len(ns.Annotations[workloadv1alpha1.InternalClusterDeletionTimestampAnnotationPrefix+firstSyncTargetName]) == 0 { + if len(ns.Annotations[workloadv1alpha1.InternalClusterDeletionTimestampAnnotationPrefix+firstSyncTargetKey]) == 0 { return false, fmt.Sprintf("resource should be removed but got %s", toYaml(ns)) } return true, "" @@ -226,7 +227,7 @@ func TestPlacementUpdate(t *testing.T) { return false, fmt.Sprintf("Failed to get service: %v", err) } - if len(svc.Annotations[workloadv1alpha1.InternalClusterDeletionTimestampAnnotationPrefix+firstSyncTargetName]) == 0 { + if len(svc.Annotations[workloadv1alpha1.InternalClusterDeletionTimestampAnnotationPrefix+firstSyncTargetKey]) == 0 { return false, fmt.Sprintf("resource should be removed but got %s", toYaml(svc)) } return true, "" @@ -314,10 +315,10 @@ func TestPlacementUpdate(t *testing.T) { return false, fmt.Sprintf("Failed to get service: %v", err) } - if len(svc.Annotations[workloadv1alpha1.InternalClusterDeletionTimestampAnnotationPrefix+firstSyncTargetName]) != 0 { + if len(svc.Annotations[workloadv1alpha1.InternalClusterDeletionTimestampAnnotationPrefix+firstSyncTargetKey]) != 0 { return false, fmt.Sprintf("resource should not be removed but got %s", toYaml(svc)) } - return svc.Labels[workloadv1alpha1.ClusterResourceStateLabelPrefix+firstSyncTargetName] == string(workloadv1alpha1.ResourceStateSync), "" + return svc.Labels[workloadv1alpha1.ClusterResourceStateLabelPrefix+firstSyncTargetKey] == string(workloadv1alpha1.ResourceStateSync), "" }, wait.ForeverTestTimeout, time.Millisecond*100) t.Logf("Wait for the service to be sync to the downstream cluster") diff --git a/test/e2e/syncer/syncer_test.go b/test/e2e/syncer/syncer_test.go index 23484ffd029..b69e3abea2c 100644 --- a/test/e2e/syncer/syncer_test.go +++ b/test/e2e/syncer/syncer_test.go @@ -33,6 +33,7 @@ import ( rbacv1 "k8s.io/api/rbac/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/yaml" kubernetesclientset "k8s.io/client-go/kubernetes" @@ -40,6 +41,7 @@ import ( "k8s.io/utils/pointer" kyaml "sigs.k8s.io/yaml" + workloadv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/workload/v1alpha1" clientset "github.com/kcp-dev/kcp/pkg/client/clientset/versioned" kcpclientset "github.com/kcp-dev/kcp/pkg/client/clientset/versioned" "github.com/kcp-dev/kcp/pkg/syncer/shared" @@ -141,6 +143,8 @@ func TestSyncerLifecycle(t *testing.T) { upstreamDeployment, err := upstreamKubeClient.AppsV1().Deployments(upstreamNamespace.Name).Create(ctx, deployment, metav1.CreateOptions{}) require.NoError(t, err, "failed to create deployment") + syncTargetKey := workloadv1alpha1.ToSyncTargetKey(logicalcluster.From(syncTarget), syncTarget.Name) + t.Logf("Waiting for upstream deployment %s/%s to get the syncer finalizer", upstreamNamespace.Name, upstreamDeployment.Name) require.Eventually(t, func() bool { deployment, err = upstreamKubeClient.AppsV1().Deployments(upstreamNamespace.Name).Get(ctx, upstreamDeployment.Name, metav1.GetOptions{}) @@ -151,7 +155,7 @@ func TestSyncerLifecycle(t *testing.T) { t.Errorf("saw an error waiting for upstream deployment %s/%s to get the syncer finalizer: %v", upstreamNamespace.Name, upstreamDeployment.Name, err) } for _, finalizer := range deployment.Finalizers { - if finalizer == "workload.kcp.dev/syncer-"+syncerFixture.SyncerConfig.SyncTargetName { + if finalizer == "workload.kcp.dev/syncer-"+syncTargetKey { return true } } @@ -267,17 +271,8 @@ func TestSyncerLifecycle(t *testing.T) { // Add a virtual Finalizer to the deployment and update it. t.Logf("Adding a virtual finalizer to the upstream deployment %s/%s in order to simulate an external controller", upstreamNamespace.Name, upstreamDeployment.Name) - upstreamDeployment, err = upstreamKubeClient.AppsV1().Deployments(upstreamNamespace.Name).Get(ctx, upstreamDeployment.Name, metav1.GetOptions{}) - require.NoError(t, err) - - upstreamDeployment = upstreamDeployment.DeepCopy() - annotations := upstreamDeployment.GetAnnotations() - if annotations == nil { - annotations = make(map[string]string, 0) - } - annotations["finalizers.workload.kcp.dev/"+syncerFixture.SyncerConfig.SyncTargetName] = "external-controller-finalizer" - upstreamDeployment.SetAnnotations(annotations) - _, err = upstreamKubeClient.AppsV1().Deployments(upstreamNamespace.Name).Update(ctx, upstreamDeployment, metav1.UpdateOptions{}) + deploymentPatch := []byte(`{"metadata":{"annotations":{"finalizers.workload.kcp.dev/` + syncTargetKey + `":"external-controller-finalizer"}}}`) + _, err = upstreamKubeClient.AppsV1().Deployments(upstreamNamespace.Name).Patch(ctx, upstreamDeployment.Name, types.MergePatchType, deploymentPatch, metav1.PatchOptions{}) require.NoError(t, err) t.Logf("Deleting upstream deployment %s/%s", upstreamNamespace.Name, upstreamDeployment.Name) @@ -291,7 +286,7 @@ func TestSyncerLifecycle(t *testing.T) { return false, "" } require.NoError(t, err) - if val, ok := deployment.GetAnnotations()["deletion.internal.workload.kcp.dev/"+syncerFixture.SyncerConfig.SyncTargetName]; ok && val != "" { + if val, ok := deployment.GetAnnotations()["deletion.internal.workload.kcp.dev/"+syncTargetKey]; ok && val != "" { return true, "" } return false, toYaml(deployment) @@ -308,28 +303,19 @@ func TestSyncerLifecycle(t *testing.T) { }, 5*time.Second, time.Second, "upstream Deployment got deleted or there was an error", upstreamNamespace.Name, upstreamDeployment.Name) t.Logf("Checking if the downstream deployment %s/%s is deleted or not (shouldn't as there's a virtual finalizer that blocks the deletion of the downstream resource)", downstreamNamespaceName, upstreamDeployment.Name) - require.Never(t, func() bool { + require.Neverf(t, func() bool { _, err := downstreamKubeClient.AppsV1().Deployments(downstreamNamespaceName).Get(ctx, upstreamDeployment.Name, metav1.GetOptions{}) if apierrors.IsNotFound(err) { return true } require.NoError(t, err) return false - }, 5*time.Second, time.Second, "downstream Deployment got deleted or there was an error", downstreamNamespaceName, upstreamDeployment.Name) + }, 5*time.Second, time.Second, "downstream Deployment %s/%s got deleted or there was an error", downstreamNamespaceName, upstreamDeployment.Name) // deleting a virtual Finalizer on the deployment and updating it. t.Logf("Removing the virtual finalizer on the upstream deployment %s/%s, the deployment deletion should go through after this", upstreamNamespace.Name, upstreamDeployment.Name) - upstreamDeployment, err = upstreamKubeClient.AppsV1().Deployments(upstreamNamespace.Name).Get(ctx, upstreamDeployment.Name, metav1.GetOptions{}) - require.NoError(t, err) - - upstreamDeployment = upstreamDeployment.DeepCopy() - annotations = upstreamDeployment.GetAnnotations() - if annotations == nil { - annotations = make(map[string]string, 0) - } - annotations["finalizers.workload.kcp.dev/"+syncerFixture.SyncerConfig.SyncTargetName] = "" - upstreamDeployment.SetAnnotations(annotations) - _, err = upstreamKubeClient.AppsV1().Deployments(upstreamNamespace.Name).Update(ctx, upstreamDeployment, metav1.UpdateOptions{}) + deploymentPatch = []byte(`{"metadata":{"annotations":{"finalizers.workload.kcp.dev/` + syncTargetKey + `": null}}}`) + _, err = upstreamKubeClient.AppsV1().Deployments(upstreamNamespace.Name).Patch(ctx, upstreamDeployment.Name, types.MergePatchType, deploymentPatch, metav1.PatchOptions{}) require.NoError(t, err) t.Logf("Waiting for upstream deployment %s/%s to be deleted", upstreamNamespace.Name, upstreamDeployment.Name) From fc384924f5afce17ceb8c4afc2952342fcf8e5a0 Mon Sep 17 00:00:00 2001 From: Joaquim Moreno Date: Thu, 28 Jul 2022 19:19:10 +0000 Subject: [PATCH 3/3] Update go.mod --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index a126562ff62..864f6eb803c 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,6 @@ require ( github.com/kcp-dev/apimachinery v0.0.0-20220719204446-c2de90c1e6ac github.com/kcp-dev/kcp/pkg/apis v0.0.0-00010101000000-000000000000 github.com/kcp-dev/logicalcluster/v2 v2.0.0-alpha.0 - github.com/martinlindhe/base36 v1.1.1 github.com/muesli/reflow v0.1.0 github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 github.com/sirupsen/logrus v1.8.1 @@ -101,6 +100,7 @@ require ( github.com/json-iterator/go v1.1.12 // indirect github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de // indirect github.com/mailru/easyjson v0.7.6 // indirect + github.com/martinlindhe/base36 v1.1.1 // indirect github.com/mattn/go-colorable v0.1.8 // indirect github.com/mattn/go-isatty v0.0.12 // indirect github.com/mattn/go-runewidth v0.0.7 // indirect