diff --git a/pkg/syncer/spec/mutators/deployment.go b/pkg/syncer/spec/mutators/podspecable.go similarity index 71% rename from pkg/syncer/spec/mutators/deployment.go rename to pkg/syncer/spec/mutators/podspecable.go index c7cf6bc86c6..1ebb765efbc 100644 --- a/pkg/syncer/spec/mutators/deployment.go +++ b/pkg/syncer/spec/mutators/podspecable.go @@ -23,7 +23,6 @@ import ( "github.com/kcp-dev/logicalcluster/v3" - appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" @@ -38,7 +37,7 @@ import ( type ListSecretFunc func(clusterName logicalcluster.Name, namespace string) ([]runtime.Object, error) -type DeploymentMutator struct { +type PodSpecableMutator struct { upstreamURL *url.URL listSecrets ListSecretFunc serviceLister listerscorev1.ServiceLister @@ -49,18 +48,30 @@ type DeploymentMutator struct { upsyncPods bool } -func (dm *DeploymentMutator) GVR() schema.GroupVersionResource { - return schema.GroupVersionResource{ - Group: "apps", - Version: "v1", - Resource: "deployments", +func (dm *PodSpecableMutator) GVRs() []schema.GroupVersionResource { + return []schema.GroupVersionResource{ + { + Group: "apps", + Version: "v1", + Resource: "deployments", + }, + { + Group: "apps", + Version: "v1", + Resource: "statefulsets", + }, + { + Group: "apps", + Version: "v1", + Resource: "replicasets", + }, } } -func NewDeploymentMutator(upstreamURL *url.URL, secretLister ListSecretFunc, serviceLister listerscorev1.ServiceLister, +func NewPodspecableMutator(upstreamURL *url.URL, secretLister ListSecretFunc, serviceLister listerscorev1.ServiceLister, syncTargetClusterName logicalcluster.Name, - syncTargetUID types.UID, syncTargetName, dnsNamespace string, upsyncPods bool) *DeploymentMutator { - return &DeploymentMutator{ + syncTargetUID types.UID, syncTargetName, dnsNamespace string, upsyncPods bool) *PodSpecableMutator { + return &PodSpecableMutator{ upstreamURL: upstreamURL, listSecrets: secretLister, serviceLister: serviceLister, @@ -73,24 +84,30 @@ func NewDeploymentMutator(upstreamURL *url.URL, secretLister ListSecretFunc, ser } // Mutate applies the mutator changes to the object. -func (dm *DeploymentMutator) Mutate(obj *unstructured.Unstructured) error { - var deployment appsv1.Deployment - err := runtime.DefaultUnstructuredConverter.FromUnstructured( - obj.UnstructuredContent(), - &deployment) +func (dm *PodSpecableMutator) Mutate(obj *unstructured.Unstructured) error { + podTemplateUnstr, ok, err := unstructured.NestedMap(obj.UnstructuredContent(), "spec", "template") if err != nil { return err } - upstreamLogicalName := logicalcluster.From(obj) + if !ok { + return fmt.Errorf("object should have a PodTemplate.Spec 'spec.template', but doesn't: %v", obj) + } - templateSpec := &deployment.Spec.Template.Spec + podTemplate := &corev1.PodTemplateSpec{} + err = runtime.DefaultUnstructuredConverter.FromUnstructured( + podTemplateUnstr, + &podTemplate) + if err != nil { + return err + } + upstreamLogicalName := logicalcluster.From(obj) desiredServiceAccountName := "default" - if templateSpec.ServiceAccountName != "" && templateSpec.ServiceAccountName != "default" { - desiredServiceAccountName = templateSpec.ServiceAccountName + if podTemplate.Spec.ServiceAccountName != "" && podTemplate.Spec.ServiceAccountName != "default" { + desiredServiceAccountName = podTemplate.Spec.ServiceAccountName } - rawSecretList, err := dm.listSecrets(upstreamLogicalName, deployment.Namespace) + rawSecretList, err := dm.listSecrets(upstreamLogicalName, obj.GetNamespace()) if err != nil { return fmt.Errorf("error listing secrets for workspace %s: %w", upstreamLogicalName.String(), err) } @@ -123,14 +140,14 @@ func (dm *DeploymentMutator) Mutate(obj *unstructured.Unstructured) error { } if desiredSecretName == "" { - return fmt.Errorf("couldn't find a token upstream for the serviceaccount %s/%s in workspace %s", desiredServiceAccountName, deployment.Namespace, upstreamLogicalName.String()) + return fmt.Errorf("couldn't find a token upstream for the serviceaccount %s/%s in workspace %s", desiredServiceAccountName, obj.GetNamespace(), upstreamLogicalName.String()) } // Setting AutomountServiceAccountToken to false allow us to control the ServiceAccount // VolumeMount and Volume definitions. - templateSpec.AutomountServiceAccountToken = utilspointer.BoolPtr(false) + podTemplate.Spec.AutomountServiceAccountToken = utilspointer.BoolPtr(false) // Set to empty the serviceAccountName on podTemplate as we are not syncing the serviceAccount down to the workload cluster. - templateSpec.ServiceAccountName = "" + podTemplate.Spec.ServiceAccountName = "" kcpExternalHost := dm.upstreamURL.Hostname() kcpExternalPort := dm.upstreamURL.Port() @@ -193,42 +210,42 @@ func (dm *DeploymentMutator) Mutate(obj *unstructured.Unstructured) error { } // Override Envs, resolve downwardAPI FieldRef and add the VolumeMount to all the containers - for i := range deployment.Spec.Template.Spec.Containers { + for i := range podTemplate.Spec.Containers { for _, overrideEnv := range overrideEnvs { - templateSpec.Containers[i].Env = updateEnv(templateSpec.Containers[i].Env, overrideEnv) + podTemplate.Spec.Containers[i].Env = updateEnv(podTemplate.Spec.Containers[i].Env, overrideEnv) } - templateSpec.Containers[i].Env = resolveDownwardAPIFieldRefEnv(templateSpec.Containers[i].Env, deployment) - templateSpec.Containers[i].VolumeMounts = updateVolumeMount(templateSpec.Containers[i].VolumeMounts, serviceAccountMount) + podTemplate.Spec.Containers[i].Env = resolveDownwardAPIFieldRefEnv(podTemplate.Spec.Containers[i].Env, obj) + podTemplate.Spec.Containers[i].VolumeMounts = updateVolumeMount(podTemplate.Spec.Containers[i].VolumeMounts, serviceAccountMount) } // Override Envs, resolve downwardAPI FieldRef and add the VolumeMount to all the Init containers - for i := range templateSpec.InitContainers { + for i := range podTemplate.Spec.InitContainers { for _, overrideEnv := range overrideEnvs { - templateSpec.InitContainers[i].Env = updateEnv(templateSpec.InitContainers[i].Env, overrideEnv) + podTemplate.Spec.InitContainers[i].Env = updateEnv(podTemplate.Spec.InitContainers[i].Env, overrideEnv) } - templateSpec.InitContainers[i].Env = resolveDownwardAPIFieldRefEnv(templateSpec.InitContainers[i].Env, deployment) - templateSpec.InitContainers[i].VolumeMounts = updateVolumeMount(templateSpec.InitContainers[i].VolumeMounts, serviceAccountMount) + podTemplate.Spec.InitContainers[i].Env = resolveDownwardAPIFieldRefEnv(podTemplate.Spec.InitContainers[i].Env, obj) + podTemplate.Spec.InitContainers[i].VolumeMounts = updateVolumeMount(podTemplate.Spec.InitContainers[i].VolumeMounts, serviceAccountMount) } // Override Envs, resolve downwardAPI FieldRef and add the VolumeMount to all the Ephemeral containers - for i := range templateSpec.EphemeralContainers { + for i := range podTemplate.Spec.EphemeralContainers { for _, overrideEnv := range overrideEnvs { - templateSpec.EphemeralContainers[i].Env = updateEnv(templateSpec.EphemeralContainers[i].Env, overrideEnv) + podTemplate.Spec.EphemeralContainers[i].Env = updateEnv(podTemplate.Spec.EphemeralContainers[i].Env, overrideEnv) } - templateSpec.EphemeralContainers[i].Env = resolveDownwardAPIFieldRefEnv(templateSpec.EphemeralContainers[i].Env, deployment) - templateSpec.EphemeralContainers[i].VolumeMounts = updateVolumeMount(templateSpec.EphemeralContainers[i].VolumeMounts, serviceAccountMount) + podTemplate.Spec.EphemeralContainers[i].Env = resolveDownwardAPIFieldRefEnv(podTemplate.Spec.EphemeralContainers[i].Env, obj) + podTemplate.Spec.EphemeralContainers[i].VolumeMounts = updateVolumeMount(podTemplate.Spec.EphemeralContainers[i].VolumeMounts, serviceAccountMount) } // Add the ServiceAccount volume with our overrides. found := false - for i := range templateSpec.Volumes { - if templateSpec.Volumes[i].Name == "kcp-api-access" { - templateSpec.Volumes[i] = serviceAccountVolume + for i := range podTemplate.Spec.Volumes { + if podTemplate.Spec.Volumes[i].Name == "kcp-api-access" { + podTemplate.Spec.Volumes[i] = serviceAccountVolume found = true } } if !found { - templateSpec.Volumes = append(templateSpec.Volumes, serviceAccountVolume) + podTemplate.Spec.Volumes = append(podTemplate.Spec.Volumes, serviceAccountVolume) } // Overrides DNS to point to the workspace DNS @@ -238,8 +255,8 @@ func (dm *DeploymentMutator) Mutate(obj *unstructured.Unstructured) error { return err // retry } - deployment.Spec.Template.Spec.DNSPolicy = corev1.DNSNone - deployment.Spec.Template.Spec.DNSConfig = &corev1.PodDNSConfig{ + podTemplate.Spec.DNSPolicy = corev1.DNSNone + podTemplate.Spec.DNSConfig = &corev1.PodDNSConfig{ Nameservers: []string{dnsIP}, Searches: []string{ // TODO(LV): from /etc/resolv.conf obj.GetNamespace() + ".svc.cluster.local", @@ -256,31 +273,29 @@ func (dm *DeploymentMutator) Mutate(obj *unstructured.Unstructured) error { if dm.upsyncPods { syncTargetKey := workloadv1alpha1.ToSyncTargetKey(dm.syncTargetClusterName, dm.syncTargetName) - labels := deployment.Spec.Template.Labels + labels := podTemplate.Labels if labels == nil { labels = map[string]string{} } labels[workloadv1alpha1.ClusterResourceStateLabelPrefix+syncTargetKey] = string(workloadv1alpha1.ResourceStateUpsync) labels[workloadv1alpha1.InternalDownstreamClusterLabel] = syncTargetKey - deployment.Spec.Template.Labels = labels + podTemplate.Labels = labels // TODO(davidfestal): In the future we could add a diff annotation to transform the resource while upsyncing: // - remove unnecessary fields we don't want leaking to upstream // - add an owner reference to the upstream deployment } - unstructured, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&deployment) + newPodTemplateUnstr, err := runtime.DefaultUnstructuredConverter.ToUnstructured(podTemplate) if err != nil { return err } // Set the changes back into the obj. - obj.SetUnstructuredContent(unstructured) - - return nil + return unstructured.SetNestedMap(obj.Object, newPodTemplateUnstr, "spec", "template") } -func (dm *DeploymentMutator) getDNSIPForWorkspace(workspace logicalcluster.Name) (string, error) { +func (dm *PodSpecableMutator) getDNSIPForWorkspace(workspace logicalcluster.Name) (string, error) { // Retrieve the DNS IP associated to the workspace dnsServiceName := shared.GetDNSID(workspace, dm.syncTargetUID, dm.syncTargetName) @@ -299,13 +314,13 @@ func (dm *DeploymentMutator) getDNSIPForWorkspace(workspace logicalcluster.Name) } // resolveDownwardAPIFieldRefEnv replaces the downwardAPI FieldRef EnvVars with the value from the deployment, right now it only replaces the metadata.namespace. -func resolveDownwardAPIFieldRefEnv(envs []corev1.EnvVar, deployment appsv1.Deployment) []corev1.EnvVar { +func resolveDownwardAPIFieldRefEnv(envs []corev1.EnvVar, podspecable *unstructured.Unstructured) []corev1.EnvVar { var result []corev1.EnvVar for _, env := range envs { if env.ValueFrom != nil && env.ValueFrom.FieldRef != nil && env.ValueFrom.FieldRef.FieldPath == "metadata.namespace" { result = append(result, corev1.EnvVar{ Name: env.Name, - Value: deployment.Namespace, + Value: podspecable.GetNamespace(), }) } else { result = append(result, env) diff --git a/pkg/syncer/spec/mutators/deployment_test.go b/pkg/syncer/spec/mutators/podspecable_test.go similarity index 99% rename from pkg/syncer/spec/mutators/deployment_test.go rename to pkg/syncer/spec/mutators/podspecable_test.go index 35230f5680c..c2d196ed24c 100644 --- a/pkg/syncer/spec/mutators/deployment_test.go +++ b/pkg/syncer/spec/mutators/podspecable_test.go @@ -941,7 +941,7 @@ func TestDeploymentMutate(t *testing.T) { require.NoError(t, err, "Service Add() = %v", err) svcLister := listerscorev1.NewServiceLister(serviceIndexer) - dm := NewDeploymentMutator(upstreamURL, secretLister, svcLister, clusterName, "syncTargetUID", "syncTargetName", "dnsNamespace", c.upsyncPods) + dm := NewPodspecableMutator(upstreamURL, secretLister, svcLister, clusterName, "syncTargetUID", "syncTargetName", "dnsNamespace", c.upsyncPods) unstrOriginalDeployment, err := toUnstructured(c.originalDeployment) require.NoError(t, err, "toUnstructured() = %v", err) diff --git a/pkg/syncer/spec/mutators/secrets.go b/pkg/syncer/spec/mutators/secrets.go index 0c4addeb6c8..63cb6ca1ca1 100644 --- a/pkg/syncer/spec/mutators/secrets.go +++ b/pkg/syncer/spec/mutators/secrets.go @@ -25,11 +25,13 @@ import ( type SecretMutator struct { } -func (sm *SecretMutator) GVR() schema.GroupVersionResource { - return schema.GroupVersionResource{ - Group: "", - Version: "v1", - Resource: "secrets", +func (sm *SecretMutator) GVRs() []schema.GroupVersionResource { + return []schema.GroupVersionResource{ + { + Group: "", + Version: "v1", + Resource: "secrets", + }, } } diff --git a/pkg/syncer/spec/spec_controller.go b/pkg/syncer/spec/spec_controller.go index 59da4093c78..36e31eeb04f 100644 --- a/pkg/syncer/spec/spec_controller.go +++ b/pkg/syncer/spec/spec_controller.go @@ -19,7 +19,6 @@ package spec import ( "context" "encoding/json" - "errors" "fmt" "net/url" "time" @@ -33,8 +32,6 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -53,7 +50,6 @@ import ( syncerindexers "github.com/kcp-dev/kcp/pkg/syncer/indexers" "github.com/kcp-dev/kcp/pkg/syncer/shared" "github.com/kcp-dev/kcp/pkg/syncer/spec/dns" - specmutators "github.com/kcp-dev/kcp/pkg/syncer/spec/mutators" . "github.com/kcp-dev/kcp/tmc/pkg/logging" ) @@ -63,10 +59,15 @@ const ( var namespaceGVR schema.GroupVersionResource = corev1.SchemeGroupVersion.WithResource("namespaces") +type Mutator interface { + GVRs() []schema.GroupVersionResource + Mutate(obj *unstructured.Unstructured) error +} + type Controller struct { queue workqueue.RateLimitingInterface - mutators mutatorGvrMap + mutators map[schema.GroupVersionResource]Mutator dnsProcessor *dns.DNSProcessor upstreamClient kcpdynamic.ClusterInterface @@ -94,7 +95,7 @@ func NewSpecSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalcluste dnsNamespace string, syncerNamespaceInformerFactory informers.SharedInformerFactory, dnsImage string, - upsyncPods bool) (*Controller, error) { + mutators ...Mutator) (*Controller, error) { c := Controller{ queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerName), @@ -118,9 +119,9 @@ func NewSpecSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalcluste informer, ok := informers[gvr] if !ok { if shared.ContainsGVR(notSynced, gvr) { - return nil, fmt.Errorf("informer for gvr %v not synced in the downstream informer factory", gvr) + return nil, fmt.Errorf("informer for gvr %v not synced in the upstream informer factory", gvr) } - return nil, fmt.Errorf("gvr %v should be known in the downstream informer factory", gvr) + return nil, fmt.Errorf("gvr %v should be known in the upstream informer factory", gvr) } return informer.Lister(), nil }, @@ -137,6 +138,8 @@ func NewSpecSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalcluste syncTargetUID: syncTargetUID, syncTargetKey: syncTargetKey, advancedSchedulingEnabled: advancedSchedulingEnabled, + + mutators: make(map[schema.GroupVersionResource]Mutator, 2), } logger := logging.WithReconciler(syncerLogger, controllerName) @@ -254,21 +257,10 @@ func NewSpecSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalcluste }, }) - secretMutator := specmutators.NewSecretMutator() - - dnsServiceLister := syncerNamespaceInformerFactory.Core().V1().Services().Lister() - - deploymentMutator := specmutators.NewDeploymentMutator(upstreamURL, func(clusterName logicalcluster.Name, namespace string) ([]runtime.Object, error) { - secretLister, err := c.getUpstreamLister(corev1.SchemeGroupVersion.WithResource("secrets")) - if err != nil { - return nil, errors.New("informer should be up and synced for namespaces in the upstream syncer informer factory") + for _, mutator := range mutators { + for _, gvr := range mutator.GVRs() { + c.mutators[gvr] = mutator } - return secretLister.ByCluster(clusterName).ByNamespace(namespace).List(labels.Everything()) - }, dnsServiceLister, syncTargetClusterName, syncTargetUID, syncTargetName, dnsNamespace, upsyncPods) - - c.mutators = mutatorGvrMap{ - deploymentMutator.GVR(): deploymentMutator.Mutate, - secretMutator.GVR(): secretMutator.Mutate, } c.dnsProcessor = dns.NewDNSProcessor(downstreamKubeClient, @@ -276,7 +268,7 @@ func NewSpecSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalcluste syncerNamespaceInformerFactory.Rbac().V1().Roles().Lister(), syncerNamespaceInformerFactory.Rbac().V1().RoleBindings().Lister(), syncerNamespaceInformerFactory.Apps().V1().Deployments().Lister(), - dnsServiceLister, + syncerNamespaceInformerFactory.Core().V1().Services().Lister(), syncerNamespaceInformerFactory.Core().V1().Endpoints().Lister(), syncerNamespaceInformerFactory.Networking().V1().NetworkPolicies().Lister(), syncTargetUID, syncTargetName, dnsNamespace, dnsImage) diff --git a/pkg/syncer/spec/spec_process.go b/pkg/syncer/spec/spec_process.go index 2d3a90105b6..831061301e1 100644 --- a/pkg/syncer/spec/spec_process.go +++ b/pkg/syncer/spec/spec_process.go @@ -51,8 +51,6 @@ const ( syncerApplyManager = "syncer" ) -type mutatorGvrMap map[schema.GroupVersionResource]func(obj *unstructured.Unstructured) error - func deepEqualApartFromStatus(logger logr.Logger, oldUnstrob, newUnstrob *unstructured.Unstructured) bool { // TODO(jmprusi): Remove this after switching to virtual workspaces. // remove status annotation from oldObj and newObj before comparing @@ -443,7 +441,7 @@ func (c *Controller) applyToDownstream(ctx context.Context, gvr schema.GroupVers // Run any transformations on the object before we apply it to the downstream cluster. if mutator, ok := c.mutators[gvr]; ok { - if err := mutator(downstreamObj); err != nil { + if err := mutator.Mutate(downstreamObj); err != nil { return err } } diff --git a/pkg/syncer/spec/spec_process_test.go b/pkg/syncer/spec/spec_process_test.go index 418c803d096..4c6da831b67 100644 --- a/pkg/syncer/spec/spec_process_test.go +++ b/pkg/syncer/spec/spec_process_test.go @@ -19,6 +19,8 @@ package spec import ( "context" "encoding/json" + "errors" + "fmt" "net/url" "strings" "sync" @@ -38,6 +40,7 @@ import ( apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" @@ -54,7 +57,9 @@ import ( workloadv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/workload/v1alpha1" ddsif "github.com/kcp-dev/kcp/pkg/informer" "github.com/kcp-dev/kcp/pkg/syncer/indexers" + "github.com/kcp-dev/kcp/pkg/syncer/shared" "github.com/kcp-dev/kcp/pkg/syncer/spec/dns" + "github.com/kcp-dev/kcp/pkg/syncer/spec/mutators" ) var scheme *runtime.Scheme @@ -1215,9 +1220,27 @@ func TestSpecSyncerProcess(t *testing.T) { mockedCleaner := &mockedCleaner{ toClean: sets.String{}, } + + secretMutator := mutators.NewSecretMutator() + secretsGVR := corev1.SchemeGroupVersion.WithResource("secrets") + podspecableMutator := mutators.NewPodspecableMutator(upstreamURL, func(clusterName logicalcluster.Name, namespace string) ([]runtime.Object, error) { + informers, notSynced := ddsifForUpstreamSyncer.Informers() + informer, ok := informers[secretsGVR] + if !ok { + if shared.ContainsGVR(notSynced, secretsGVR) { + return nil, fmt.Errorf("informer for gvr %v not synced in the upstream informer factory", secretsGVR) + } + return nil, fmt.Errorf("gvr %v should be known in the downstream upstream factory", secretsGVR) + } + if err != nil { + return nil, errors.New("informer should be up and synced for namespaces in the upstream syncer informer factory") + } + return informer.Lister().ByCluster(clusterName).ByNamespace(namespace).List(labels.Everything()) + }, toInformerFactory.Core().V1().Services().Lister(), tc.syncTargetClusterName, syncTargetUID, tc.syncTargetName, "kcp-01c0zzvlqsi7n", false) + controller, err := NewSpecSyncer(logger, kcpLogicalCluster, tc.syncTargetName, syncTargetKey, upstreamURL, tc.advancedSchedulingEnabled, fromClusterClient, toClient, toKubeClient, ddsifForUpstreamSyncer, ddsifForDownstream, mockedCleaner, syncTargetUID, - "kcp-01c0zzvlqsi7n", toInformerFactory, "dnsimage", false) + "kcp-01c0zzvlqsi7n", toInformerFactory, "dnsimage", secretMutator, podspecableMutator) require.NoError(t, err) toInformerFactory.Start(ctx.Done()) diff --git a/pkg/syncer/syncer.go b/pkg/syncer/syncer.go index 1b3704a0cbc..dcef7c42f83 100644 --- a/pkg/syncer/syncer.go +++ b/pkg/syncer/syncer.go @@ -18,6 +18,7 @@ package syncer import ( "context" + "errors" "fmt" "net/url" "time" @@ -28,6 +29,8 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" @@ -52,7 +55,9 @@ import ( "github.com/kcp-dev/kcp/pkg/syncer/indexers" "github.com/kcp-dev/kcp/pkg/syncer/namespace" "github.com/kcp-dev/kcp/pkg/syncer/resourcesync" + "github.com/kcp-dev/kcp/pkg/syncer/shared" "github.com/kcp-dev/kcp/pkg/syncer/spec" + "github.com/kcp-dev/kcp/pkg/syncer/spec/mutators" "github.com/kcp-dev/kcp/pkg/syncer/status" "github.com/kcp-dev/kcp/pkg/syncer/upsync" . "github.com/kcp-dev/kcp/tmc/pkg/logging" @@ -278,9 +283,26 @@ func StartSyncer(ctx context.Context, cfg *SyncerConfig, numSyncerThreads int, i return err } + secretMutator := mutators.NewSecretMutator() + secretsGVR := corev1.SchemeGroupVersion.WithResource("secrets") + podspecableMutator := mutators.NewPodspecableMutator(upstreamURL, func(clusterName logicalcluster.Name, namespace string) ([]runtime.Object, error) { + informers, notSynced := ddsifForUpstreamSyncer.Informers() + informer, ok := informers[secretsGVR] + if !ok { + if shared.ContainsGVR(notSynced, secretsGVR) { + return nil, fmt.Errorf("informer for gvr %v not synced in the upstream informer factory", secretsGVR) + } + return nil, fmt.Errorf("gvr %v should be known in the downstream upstream factory", secretsGVR) + } + if err != nil { + return nil, errors.New("informer should be up and synced for namespaces in the upstream syncer informer factory") + } + return informer.Lister().ByCluster(clusterName).ByNamespace(namespace).List(labels.Everything()) + }, syncerNamespaceInformerFactory.Core().V1().Services().Lister(), logicalcluster.From(syncTarget), types.UID(cfg.SyncTargetUID), cfg.SyncTargetName, syncerNamespace, kcpfeatures.DefaultFeatureGate.Enabled(kcpfeatures.SyncerTunnel)) + specSyncer, err := spec.NewSpecSyncer(logger, logicalcluster.From(syncTarget), cfg.SyncTargetName, syncTargetKey, upstreamURL, advancedSchedulingEnabled, upstreamSyncerClusterClient, downstreamDynamicClient, downstreamKubeClient, ddsifForUpstreamSyncer, ddsifForDownstream, downstreamNamespaceController, syncTarget.GetUID(), - syncerNamespace, syncerNamespaceInformerFactory, cfg.DNSImage, kcpfeatures.DefaultFeatureGate.Enabled(kcpfeatures.SyncerTunnel)) + syncerNamespace, syncerNamespaceInformerFactory, cfg.DNSImage, secretMutator, podspecableMutator) if err != nil { return err }