diff --git a/pkg/syncer/resourcesync/controller.go b/pkg/syncer/resourcesync/controller.go index 4f3604e52cb3..82b1ad1975ad 100644 --- a/pkg/syncer/resourcesync/controller.go +++ b/pkg/syncer/resourcesync/controller.go @@ -73,7 +73,7 @@ const ( // (one for downstream and 2 for upstream, for syncing and upsyncing). func NewSyncTargetGVRSource( syncerLogger logr.Logger, - upstreamSyncerDiscovery discovery.DiscoveryInterface, + downstreamSyncerDiscoveryClient discovery.DiscoveryInterface, upstreamDynamicClusterClient kcpdynamic.ClusterInterface, downstreamDynamicClient dynamic.Interface, downstreamKubeClient kubernetes.Interface, @@ -84,14 +84,14 @@ func NewSyncTargetGVRSource( syncTargetUID types.UID, ) (*controller, error) { c := &controller{ - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerName), - upstreamSyncerDiscoveryClient: memory.NewMemCacheClient(upstreamSyncerDiscovery), - downstreamKubeClient: downstreamKubeClient, - syncTargetClient: syncTargetClient, - syncTargetUID: syncTargetUID, - syncTargetLister: syncTargetInformer.Lister(), - synctargetInformerHasSynced: syncTargetInformer.Informer().HasSynced, - gvrsToWatch: map[schema.GroupVersionResource]informer.GVRPartialMetadata{}, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerName), + downstreamSyncerDiscoveryClient: memory.NewMemCacheClient(downstreamSyncerDiscoveryClient), + downstreamKubeClient: downstreamKubeClient, + syncTargetClient: syncTargetClient, + syncTargetUID: syncTargetUID, + syncTargetLister: syncTargetInformer.Lister(), + synctargetInformerHasSynced: syncTargetInformer.Informer().HasSynced, + gvrsToWatch: map[schema.GroupVersionResource]informer.GVRPartialMetadata{}, } logger := logging.WithReconciler(syncerLogger, controllerName) @@ -128,7 +128,7 @@ type controller struct { queue workqueue.RateLimitingInterface downstreamKubeClient kubernetes.Interface - upstreamSyncerDiscoveryClient discovery.CachedDiscoveryInterface + downstreamSyncerDiscoveryClient discovery.CachedDiscoveryInterface syncTargetUID types.UID syncTargetLister workloadv1alpha1listers.SyncTargetLister @@ -273,7 +273,7 @@ func (c *controller) process(ctx context.Context, key string) error { requiredGVRs := getAllGVRs(syncTarget) - c.upstreamSyncerDiscoveryClient.Invalidate() + c.downstreamSyncerDiscoveryClient.Invalidate() var errs []error var unauthorizedGVRs []string @@ -432,7 +432,7 @@ func (c *controller) addGVR(ctx context.Context, gvr schema.GroupVersionResource } func (c *controller) getGVRPartialMetadata(gvr schema.GroupVersionResource) (*informer.GVRPartialMetadata, error) { - apiResourceList, err := c.upstreamSyncerDiscoveryClient.ServerResourcesForGroupVersion(gvr.GroupVersion().String()) + apiResourceList, err := c.downstreamSyncerDiscoveryClient.ServerResourcesForGroupVersion(gvr.GroupVersion().String()) if err != nil { return nil, err } diff --git a/pkg/syncer/syncer.go b/pkg/syncer/syncer.go index 95ff8f616efa..294a66df9d81 100644 --- a/pkg/syncer/syncer.go +++ b/pkg/syncer/syncer.go @@ -22,7 +22,6 @@ import ( "net/url" "time" - kcpdiscovery "github.com/kcp-dev/client-go/discovery" kcpdynamic "github.com/kcp-dev/client-go/dynamic" "github.com/kcp-dev/logicalcluster/v3" @@ -33,6 +32,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/discovery" "k8s.io/client-go/dynamic" kubernetesinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" @@ -198,14 +198,10 @@ func StartSyncer(ctx context.Context, cfg *SyncerConfig, numSyncerThreads int, i // syncerNamespaceInformerFactory to watch some DNS-related resources in the dns namespace syncerNamespaceInformerFactory := kubernetesinformers.NewSharedInformerFactoryWithOptions(downstreamKubeClient, resyncPeriod, kubernetesinformers.WithNamespace(syncerNamespace)) - upstreamSyncerDiscoveryClient, err := kcpdiscovery.NewForConfig(upstreamConfig) - if err != nil { - return err - } - + downstreamSyncerDiscoveryClient := discovery.NewDiscoveryClient(downstreamKubeClient.RESTClient()) syncTargetGVRSource, err := resourcesync.NewSyncTargetGVRSource( logger, - upstreamSyncerDiscoveryClient.DiscoveryInterface, + downstreamSyncerDiscoveryClient, upstreamSyncerClusterClient, downstreamDynamicClient, downstreamKubeClient, @@ -219,7 +215,18 @@ func StartSyncer(ctx context.Context, cfg *SyncerConfig, numSyncerThreads int, i return err } - ddsifForUpstreamSyncer, err := ddsif.NewDiscoveringDynamicSharedInformerFactory(upstreamSyncerClusterClient, nil, nil, syncTargetGVRSource, cache.Indexers{}) + ddsifForUpstreamSyncer, err := ddsif.NewDiscoveringDynamicSharedInformerFactory(upstreamSyncerClusterClient, nil, nil, + &filteredGVRSource{ + GVRSource: syncTargetGVRSource, + keepGVR: func(gvr schema.GroupVersionResource) bool { + // Don't expose pods or endpoints via the syncer vw + if gvr.Group == corev1.GroupName && (gvr.Resource == "pods" || gvr.Resource == "endpoints") { + return false + } + return true + }, + }, + cache.Indexers{}) if err != nil { return err } diff --git a/pkg/virtual/syncer/builder/build.go b/pkg/virtual/syncer/builder/build.go index 2005259ad4f3..c5f41af0ec81 100644 --- a/pkg/virtual/syncer/builder/build.go +++ b/pkg/virtual/syncer/builder/build.go @@ -83,7 +83,13 @@ func BuildVirtualWorkspace( virtualWorkspaceName: SyncerVirtualWorkspaceName, filteredResourceState: workloadv1alpha1.ResourceStateSync, restProviderBuilder: NewSyncerRestProvider, - allowedAPIFilter: nil, + allowedAPIFilter: func(apiGroupResource schema.GroupResource) bool { + // Don't expose Endpoints or Pods via the Syncer VirtualWorkspace. + if apiGroupResource.Group == "" && (apiGroupResource.Resource == "pods" || apiGroupResource.Resource == "endpoints") { + return false + } + return true + }, transformer: &transformations.SyncerResourceTransformer{ TransformationProvider: &transformations.SpecDiffTransformation{}, SummarizingRulesProvider: &transformations.DefaultSummarizingRules{}, @@ -99,7 +105,7 @@ func BuildVirtualWorkspace( restProviderBuilder: NewUpSyncerRestProvider, allowedAPIFilter: func(apiGroupResource schema.GroupResource) bool { // Only allow persistentvolumes and Pods to be Upsynced. - return apiGroupResource.Group == "" && apiGroupResource.Resource == "persistentvolumes" || apiGroupResource.Group == "" && apiGroupResource.Resource == "pods" + return apiGroupResource.Group == "" && (apiGroupResource.Resource == "persistentvolumes" || apiGroupResource.Resource == "pods") }, transformer: &upsyncer.UpsyncerResourceTransformer{}, storageWrapperBuilder: upsyncer.WithStaticLabelSelectorAndInWriteCallsCheck, diff --git a/test/e2e/virtual/syncer/virtualworkspace_test.go b/test/e2e/virtual/syncer/virtualworkspace_test.go index 594db355e81b..cb0f9416a5b4 100644 --- a/test/e2e/virtual/syncer/virtualworkspace_test.go +++ b/test/e2e/virtual/syncer/virtualworkspace_test.go @@ -156,27 +156,9 @@ func withRootComputeAPIResourceList(workspaceName logicalcluster.Name, rootCompu Verbs: metav1.Verbs{"get", "patch", "update"}, StorageVersionHash: "", }, - metav1.APIResource{ - Kind: "Pod", - Name: "pods", - SingularName: "pod", - Namespaced: true, - Verbs: metav1.Verbs{"get", "list", "patch", "update", "watch"}, - ShortNames: []string{"po"}, - Categories: []string{"all"}, - StorageVersionHash: discovery.StorageVersionHash(rootComputeLogicalCluster, "", "v1", "Pod"), - }, - metav1.APIResource{ - Kind: "Pod", - Name: "pods/status", - SingularName: "", - Namespaced: true, - Verbs: metav1.Verbs{"get", "patch", "update"}, - StorageVersionHash: "", - }, ) - return sortAPIResourceList([]*metav1.APIResourceList{ + return []*metav1.APIResourceList{ deploymentsAPIResourceList(rootComputeLogicalCluster), { TypeMeta: metav1.TypeMeta{ @@ -204,7 +186,7 @@ func withRootComputeAPIResourceList(workspaceName logicalcluster.Name, rootCompu }, }, coreResourceList, - }) + } } func logWithTimestampf(t *testing.T, format string, args ...interface{}) {