From 5e0a7c96c2dc0218ac693ca7a094016b0c3c56f2 Mon Sep 17 00:00:00 2001 From: Joaquim Moreno Date: Tue, 21 Feb 2023 12:17:45 +0100 Subject: [PATCH] Tunnel: Validate namespace/pod at the syncer side --- pkg/syncer/syncer.go | 9 +- pkg/syncer/tunneler.go | 167 +++++++++++++++++- pkg/tunneler/podsubresourceproxy_handler.go | 1 - test/e2e/framework/syncer.go | 49 +++++- test/e2e/syncer/tunnels_test.go | 185 +++++++++++++++++++- 5 files changed, 400 insertions(+), 11 deletions(-) diff --git a/pkg/syncer/syncer.go b/pkg/syncer/syncer.go index 1b3704a0cbc1..702e4d35d249 100644 --- a/pkg/syncer/syncer.go +++ b/pkg/syncer/syncer.go @@ -406,7 +406,14 @@ func StartSyncer(ctx context.Context, cfg *SyncerConfig, numSyncerThreads int, i // Start tunneler for POD access if kcpfeatures.DefaultFeatureGate.Enabled(kcpfeatures.SyncerTunnel) { - go startSyncerTunnel(ctx, upstreamConfig, downstreamConfig, logicalcluster.From(syncTarget), cfg.SyncTargetName) + StartSyncerTunnel(ctx, upstreamConfig, downstreamConfig, logicalcluster.From(syncTarget), cfg.SyncTargetName, cfg.SyncTargetUID, func(gvr schema.GroupVersionResource) (cache.GenericLister, error) { + informers, _ := ddsifForDownstream.Informers() + informer, ok := informers[gvr] + if !ok { + return nil, fmt.Errorf("failed to get informer for gvr: %s", gvr) + } + return informer.Lister(), nil + }) } StartHeartbeat(ctx, kcpSyncTargetClient, cfg.SyncTargetName, cfg.SyncTargetUID) diff --git a/pkg/syncer/tunneler.go b/pkg/syncer/tunneler.go index 622ad8bf1961..e46fab1aa560 100644 --- a/pkg/syncer/tunneler.go +++ b/pkg/syncer/tunneler.go @@ -18,6 +18,7 @@ package syncer import ( "context" + "fmt" "net/http" "net/http/httputil" "net/url" @@ -25,16 +26,40 @@ import ( "github.com/kcp-dev/logicalcluster/v3" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" "k8s.io/utils/clock" + workloadv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/workload/v1alpha1" + "github.com/kcp-dev/kcp/pkg/server/requestinfo" + "github.com/kcp-dev/kcp/pkg/syncer/shared" "github.com/kcp-dev/kcp/pkg/tunneler" ) -// startSyncerTunnel blocks until the context is cancelled trying to establish a tunnel against the specified target. -func startSyncerTunnel(ctx context.Context, upstream, downstream *rest.Config, syncTargetWorkspace logicalcluster.Name, syncTargetName string) { +var ( + errorScheme = runtime.NewScheme() + errorCodecs = serializer.NewCodecFactory(errorScheme) +) + +func init() { + errorScheme.AddUnversionedTypes(metav1.Unversioned, + &metav1.Status{}, + ) +} + +type ListerFuncForResource func(gvr schema.GroupVersionResource) (cache.GenericLister, error) + +// StartSyncerTunnel blocks until the context is cancelled trying to establish a tunnel against the specified target. +func StartSyncerTunnel(ctx context.Context, upstream, downstream *rest.Config, syncTargetWorkspace logicalcluster.Name, syncTargetName, syncTargetUID string, getDownstreamLister ListerFuncForResource) { // connect to create the reverse tunnels var ( initBackoff = 5 * time.Second @@ -49,16 +74,16 @@ func startSyncerTunnel(ctx context.Context, upstream, downstream *rest.Config, s backoffMgr := wait.NewExponentialBackoffManager(initBackoff, maxBackoff, resetDuration, backoffFactor, jitter, clock) logger := klog.FromContext(ctx) - wait.BackoffUntil(func() { + go wait.BackoffUntil(func() { logger.V(5).Info("starting tunnel") - err := startTunneler(ctx, upstream, downstream, syncTargetWorkspace, syncTargetName) + err := startTunneler(ctx, upstream, downstream, syncTargetWorkspace, syncTargetName, syncTargetUID, getDownstreamLister) if err != nil { logger.Error(err, "failed to create tunnel") } }, backoffMgr, sliding, ctx.Done()) } -func startTunneler(ctx context.Context, upstream, downstream *rest.Config, syncTargetClusterName logicalcluster.Name, syncTargetName string) error { +func startTunneler(ctx context.Context, upstream, downstream *rest.Config, syncTargetClusterName logicalcluster.Name, syncTargetName, syncTargetUID string, getDownstreamLister ListerFuncForResource) error { logger := klog.FromContext(ctx) // syncer --> kcp @@ -85,6 +110,7 @@ func startTunneler(ctx context.Context, upstream, downstream *rest.Config, syncT if err != nil { return err } + proxy.Transport = clientDownstream.Transport // create the reverse connection @@ -109,7 +135,7 @@ func startTunneler(ctx context.Context, upstream, downstream *rest.Config, syncT defer l.Close() // reverse proxy the request coming from the reverse connection to the p-cluster apiserver - server := &http.Server{ReadHeaderTimeout: 30 * time.Second, Handler: proxy} + server := &http.Server{ReadHeaderTimeout: 30 * time.Second, Handler: proxyHandlerFiltered(proxy, getDownstreamLister, syncTargetClusterName, syncTargetName, syncTargetUID)} defer server.Close() logger.V(2).Info("serving on reverse connection") @@ -126,3 +152,132 @@ func startTunneler(ctx context.Context, upstream, downstream *rest.Config, syncT logger.V(2).Info("stop serving on reverse connection") return err } + +func proxyHandlerFiltered(proxy *httputil.ReverseProxy, listerFunc ListerFuncForResource, synctargetClusterName logicalcluster.Name, synctargetName, syncTargetUID string) http.HandlerFunc { + namespaceGVR := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "namespaces"} + podGVR := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"} + + return func(w http.ResponseWriter, req *http.Request) { + resolver := requestinfo.NewKCPRequestInfoResolver() + requestInfo, err := resolver.NewRequestInfo(req) + if err != nil { + responsewriters.ErrorNegotiated( + errors.NewInternalError(fmt.Errorf("could not resolve RequestInfo: %w", err)), + errorCodecs, schema.GroupVersion{}, w, req, + ) + return + } + + // Ensure that requests are only for pods, and we have the required information, if not, return false. + if requestInfo.Resource != "pods" || requestInfo.Subresource == "" || requestInfo.Name == "" || requestInfo.Namespace == "" { + responsewriters.ErrorNegotiated( + errors.NewForbidden(podGVR.GroupResource(), requestInfo.Name, fmt.Errorf("only pod subresources are allowed")), + errorCodecs, schema.GroupVersion{}, w, req, + ) + return + } + + // Ensure that requests are only for pods in a namespace owned by this syncer, if not, return false. + downstreamNamespaceName := requestInfo.Namespace + + nsInformer, err := listerFunc(namespaceGVR) + if err != nil { + responsewriters.ErrorNegotiated( + errors.NewInternalError(fmt.Errorf("error while getting downstream namespace lister: %w", err)), + errorCodecs, schema.GroupVersion{}, w, req, + ) + return + } + + obj, err := nsInformer.Get(downstreamNamespaceName) + if errors.IsNotFound(err) { + responsewriters.ErrorNegotiated( + errors.NewForbidden(namespaceGVR.GroupResource(), downstreamNamespaceName, fmt.Errorf("namespace %s does not exist", downstreamNamespaceName)), + errorCodecs, schema.GroupVersion{}, w, req, + ) + return + } + if err != nil { + responsewriters.ErrorNegotiated( + errors.NewInternalError(fmt.Errorf("error while getting namespace resource: %w", err)), + errorCodecs, schema.GroupVersion{}, w, req, + ) + return + } + + downstreamNs, ok := obj.(*unstructured.Unstructured) + if !ok { + responsewriters.ErrorNegotiated( + errors.NewInternalError(fmt.Errorf("namespace resource should be *unstructured.Unstructured but was: %T", obj)), + errorCodecs, schema.GroupVersion{}, w, req, + ) + return + } + + // Ensure the referenced downstream namespace locator is correct and owned by this syncer. + if locator, ok, err := shared.LocatorFromAnnotations(downstreamNs.GetAnnotations()); ok { + if err != nil { + responsewriters.ErrorNegotiated( + errors.NewInternalError(fmt.Errorf("error while getting locator from namespace: %w", err)), + errorCodecs, schema.GroupVersion{}, w, req, + ) + return + } + if locator.SyncTarget.Name != synctargetName || string(locator.SyncTarget.UID) != syncTargetUID || locator.SyncTarget.ClusterName != string(synctargetClusterName) { + responsewriters.ErrorNegotiated( + errors.NewForbidden(namespaceGVR.GroupResource(), downstreamNamespaceName, fmt.Errorf("namespace %q is not owned by this syncer", downstreamNamespaceName)), + errorCodecs, schema.GroupVersion{}, w, req, + ) + return + } + } else { + responsewriters.ErrorNegotiated( + errors.NewInternalError(fmt.Errorf("locator not found on namespace: %q", downstreamNamespaceName)), + errorCodecs, schema.GroupVersion{}, w, req, + ) + return + } + + // Ensure Pod is in Upsynced state. + podName := requestInfo.Name + podInformer, err := listerFunc(podGVR) + if err != nil { + responsewriters.ErrorNegotiated( + errors.NewInternalError(fmt.Errorf("error while getting pod lister: %w", err)), + errorCodecs, schema.GroupVersion{}, w, req, + ) + return + } + obj, err = podInformer.ByNamespace(downstreamNamespaceName).Get(podName) + if errors.IsNotFound(err) { + responsewriters.ErrorNegotiated( + errors.NewForbidden(podGVR.GroupResource(), podName, fmt.Errorf("pod %q does not exist", podName)), + errorCodecs, schema.GroupVersion{}, w, req, + ) + return + } + if err != nil { + responsewriters.ErrorNegotiated( + errors.NewInternalError(fmt.Errorf("error while getting pod resource: %w", err)), + errorCodecs, schema.GroupVersion{}, w, req, + ) + return + } + downstreamPod, ok := obj.(*unstructured.Unstructured) + if !ok { + responsewriters.ErrorNegotiated( + errors.NewInternalError(fmt.Errorf("pod resource should be *unstructured.Unstructured but was: %T", obj)), + errorCodecs, schema.GroupVersion{}, w, req, + ) + return + } + if downstreamPod.GetLabels()[workloadv1alpha1.ClusterResourceStateLabelPrefix+workloadv1alpha1.ToSyncTargetKey(synctargetClusterName, synctargetName)] != string(workloadv1alpha1.ResourceStateUpsync) { + responsewriters.ErrorNegotiated( + errors.NewForbidden(podGVR.GroupResource(), podName, fmt.Errorf("pod %q is not in upsynced state", podName)), + errorCodecs, schema.GroupVersion{}, w, req, + ) + return + } + proxy.ServeHTTP(w, req) + } +} diff --git a/pkg/tunneler/podsubresourceproxy_handler.go b/pkg/tunneler/podsubresourceproxy_handler.go index 18269ff562e1..e8efe51bbb45 100644 --- a/pkg/tunneler/podsubresourceproxy_handler.go +++ b/pkg/tunneler/podsubresourceproxy_handler.go @@ -263,7 +263,6 @@ func (tn *tunneler) Proxy(clusterName logicalcluster.Name, syncerName string, rw } proxy := httputil.NewSingleHostReverseProxy(target) - // director := proxy.Director proxy.Transport = &http.Transport{ Proxy: nil, // no proxies DialContext: d.Dial, // use a reverse connection diff --git a/test/e2e/framework/syncer.go b/test/e2e/framework/syncer.go index 5176fa209fa9..7d16bf928f40 100644 --- a/test/e2e/framework/syncer.go +++ b/test/e2e/framework/syncer.go @@ -41,8 +41,11 @@ import ( "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" + kubernetesinformers "k8s.io/client-go/informers" kubernetesclient "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/retry" "k8s.io/klog/v2" @@ -56,6 +59,7 @@ import ( kcpclientset "github.com/kcp-dev/kcp/pkg/client/clientset/versioned/cluster" kcpinformers "github.com/kcp-dev/kcp/pkg/client/informers/externalversions" workloadcliplugin "github.com/kcp-dev/kcp/pkg/cliplugins/workload/plugin" + "github.com/kcp-dev/kcp/pkg/indexers" "github.com/kcp-dev/kcp/pkg/syncer" "github.com/kcp-dev/kcp/pkg/syncer/shared" ) @@ -570,7 +574,50 @@ type appliedSyncerFixture struct { SyncerVirtualWorkspaceConfig *rest.Config UpsyncerVirtualWorkspaceConfig *rest.Config - stopHeartBeat context.CancelFunc + stopHeartBeat context.CancelFunc + stopSyncerTunnel context.CancelFunc +} + +func (sf *appliedSyncerFixture) StartSyncerTunnel(t *testing.T) *StartedSyncerFixture { + t.Helper() + ctx, cancelFunc := context.WithCancel(context.Background()) + t.Cleanup(cancelFunc) + sf.stopSyncerTunnel = cancelFunc + + downstreamClient, err := dynamic.NewForConfig(sf.SyncerConfig.DownstreamConfig) + require.NoError(t, err) + + downstreamInformer := dynamicinformer.NewDynamicSharedInformerFactory(downstreamClient, 10*time.Hour) + downstreamInformer.Start(ctx.Done()) + + podGvr := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"} + namespaceGvr := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "namespaces"} + informers := make(map[schema.GroupVersionResource]kubernetesinformers.GenericInformer) + + // Let's bootstrap the pod and namespace informers so they are ready to use during tests. + informers[podGvr] = downstreamInformer.ForResource(podGvr) + indexers.AddIfNotPresentOrDie(informers[podGvr].Informer().GetIndexer(), cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + informers[namespaceGvr] = downstreamInformer.ForResource(namespaceGvr) + + syncer.StartSyncerTunnel(ctx, sf.SyncerConfig.UpstreamConfig, sf.SyncerConfig.DownstreamConfig, sf.SyncTargetClusterName, sf.SyncerConfig.SyncTargetName, sf.SyncerConfig.SyncTargetUID, func(gvr schema.GroupVersionResource) (cache.GenericLister, error) { + if _, ok := informers[gvr]; !ok { + return nil, fmt.Errorf("no informer for %v", gvr) + } + return informers[gvr].Lister(), nil + }) + startedSyncer := &StartedSyncerFixture{ + sf, + } + + return startedSyncer +} + +// StopSyncerTunnel stops the syncer tunnel, the syncer will close the reverse connection and +// pod subresources will not be available anymore. +func (sf *StartedSyncerFixture) StopSyncerTunnel(t *testing.T) { + t.Helper() + + sf.stopSyncerTunnel() } // StartHeartBeat starts the Heartbeat keeper to maintain diff --git a/test/e2e/syncer/tunnels_test.go b/test/e2e/syncer/tunnels_test.go index 116cc570b877..aed88c81adc8 100644 --- a/test/e2e/syncer/tunnels_test.go +++ b/test/e2e/syncer/tunnels_test.go @@ -19,6 +19,7 @@ package syncer import ( "bytes" "context" + "encoding/json" "fmt" "testing" "time" @@ -32,9 +33,10 @@ import ( rbacv1 "k8s.io/api/rbac/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" - kubernetesclientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes" featuregatetesting "k8s.io/component-base/featuregate/testing" workloadv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/workload/v1alpha1" @@ -127,7 +129,7 @@ func TestSyncerTunnel(t *testing.T) { require.NoError(t, err) - downstreamKubeClient, err := kubernetesclientset.NewForConfig(syncerFixture.DownstreamConfig) + downstreamKubeClient, err := kubernetes.NewForConfig(syncerFixture.DownstreamConfig) require.NoError(t, err) upstreamKcpClient, err := kcpclientset.NewForConfig(syncerFixture.SyncerConfig.UpstreamConfig) @@ -279,3 +281,182 @@ func TestSyncerTunnel(t *testing.T) { return podLogs.Len() > 1, podLogs.String() }, wait.ForeverTestTimeout, time.Millisecond*100, "couldn't get downstream pod logs for deployment %s/%s", d.Namespace, d.Name) } + +// TestSyncerTunnelFilter ensures that the syncer tunnel will reject trying to access a Pod that is crafted and not actually upsynced. +func TestSyncerTunnelFilter(t *testing.T) { + t.Parallel() + framework.Suite(t, "transparent-multi-cluster") + + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SyncerTunnel, true)() + + ctx, cancelFunc := context.WithCancel(context.Background()) + t.Cleanup(cancelFunc) + + kcpServer := framework.PrivateKcpServer(t) + orgPath, _ := framework.NewOrganizationFixture(t, kcpServer, framework.TODO_WithoutMultiShardSupport()) + locationPath, locationWs := framework.NewWorkspaceFixture(t, kcpServer, orgPath, framework.TODO_WithoutMultiShardSupport()) + locationWsName := logicalcluster.Name(locationWs.Spec.Cluster) + userPath, userWs := framework.NewWorkspaceFixture(t, kcpServer, orgPath, framework.TODO_WithoutMultiShardSupport()) + userWsName := logicalcluster.Name(userWs.Spec.Cluster) + + // Creating synctarget and deploying the syncer + syncerFixture := framework.NewSyncerFixture(t, kcpServer, locationPath, framework.WithSyncedUserWorkspaces(userWs)).CreateSyncTargetAndApplyToDownstream(t).StartAPIImporter(t).StartHeartBeat(t) + syncerFixture.StartSyncerTunnel(t) + + t.Log("Binding the consumer workspace to the location workspace") + framework.NewBindCompute(t, userWsName.Path(), kcpServer, + framework.WithLocationWorkspaceWorkloadBindOption(locationWsName.Path()), + ).Bind(t) + + kcpClient, err := kcpclientset.NewForConfig(kcpServer.BaseConfig(t)) + require.NoError(t, err) + + syncTarget, err := kcpClient.Cluster(syncerFixture.SyncerConfig.SyncTargetPath).WorkloadV1alpha1().SyncTargets().Get(ctx, + syncerFixture.SyncerConfig.SyncTargetName, + metav1.GetOptions{}, + ) + require.NoError(t, err) + + kcpKubeClusterClient, err := kcpkubernetesclientset.NewForConfig(kcpServer.BaseConfig(t)) + require.NoError(t, err) + + downstreamKubeLikeClient, err := kubernetes.NewForConfig(syncerFixture.SyncerConfig.DownstreamConfig) + require.NoError(t, err) + + upstreamNs, err := kcpKubeClusterClient.CoreV1().Namespaces().Cluster(userPath).Create(ctx, &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-syncer", + }, + }, metav1.CreateOptions{}) + require.NoError(t, err) + + nsLocator := shared.NamespaceLocator{ + SyncTarget: shared.SyncTargetLocator{ + ClusterName: string(logicalcluster.From(syncTarget)), + Name: syncTarget.Name, + UID: syncTarget.UID, + }, + ClusterName: logicalcluster.From(upstreamNs), + Namespace: upstreamNs.Name, + } + + downstreamNsName, err := shared.PhysicalClusterNamespaceName(nsLocator) + require.NoError(t, err) + + // Convert the locator to json, as we need to set it on the namespace. + locatorJSON, err := json.Marshal(nsLocator) + require.NoError(t, err) + + // Create a namespace on the downstream cluster that matches the kcp namespace, with a correct locator. + _, err = downstreamKubeLikeClient.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: downstreamNsName, + Labels: map[string]string{ + workloadv1alpha1.InternalDownstreamClusterLabel: workloadv1alpha1.ToSyncTargetKey(logicalcluster.From(syncTarget), syncTarget.Name), + }, + Annotations: map[string]string{ + shared.NamespaceLocatorAnnotation: string(locatorJSON), + }, + }, + }, metav1.CreateOptions{}) + require.NoError(t, err) + + // Create a pod downstream that is not upsynced, to ensure that the syncer tunnel will reject it. + _, err = downstreamKubeLikeClient.CoreV1().Pods(downstreamNsName).Create(ctx, &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Finalizers: []string{ + shared.SyncerFinalizerNamePrefix + workloadv1alpha1.ToSyncTargetKey(logicalcluster.From(syncTarget), syncTarget.Name), + }, + Labels: map[string]string{ + workloadv1alpha1.InternalDownstreamClusterLabel: workloadv1alpha1.ToSyncTargetKey(logicalcluster.From(syncTarget), syncTarget.Name), + workloadv1alpha1.ClusterResourceStateLabelPrefix + workloadv1alpha1.ToSyncTargetKey(logicalcluster.From(syncTarget), syncTarget.Name): "", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test", + }, + }, + }, + }, metav1.CreateOptions{}) + require.NoError(t, err) + + // Create a pod on the upstream namespace that looks like the downstream pod being upsynced. + upsyncedClient, err := kcpkubernetesclientset.NewForConfig(syncerFixture.UpsyncerVirtualWorkspaceConfig) + require.NoError(t, err) + + upsyncedPod, err := upsyncedClient.CoreV1().Pods().Cluster(userWsName.Path()).Namespace(upstreamNs.Name).Create(ctx, &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Finalizers: []string{}, + Labels: map[string]string{ + workloadv1alpha1.ClusterResourceStateLabelPrefix + workloadv1alpha1.ToSyncTargetKey(logicalcluster.From(syncTarget), syncTarget.Name): "Upsync", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test", + }, + }, + }, + }, metav1.CreateOptions{}) + require.NoError(t, err) + + framework.Eventually(t, func() (bool, string) { + expectedError := fmt.Sprintf("unknown (get pods %s)", upsyncedPod.Name) + request := kcpKubeClusterClient.Cluster(userWsName.Path()).CoreV1().Pods(upstreamNs.Name).GetLogs(upsyncedPod.Name, &corev1.PodLogOptions{}) + _, err = request.Do(ctx).Raw() + if err != nil { + if err.Error() == expectedError { + return true, "" + } + return false, fmt.Sprintf("Returned error: %s is different from expected error: %s", err.Error(), expectedError) + } + return false, "no error returned from get logs" + }, wait.ForeverTestTimeout, time.Millisecond*100, "") + + // Update the downstream namespace locator to point to another synctarget. + locatorJSON, err = json.Marshal(shared.NamespaceLocator{ + SyncTarget: shared.SyncTargetLocator{ + ClusterName: "another-cluster", + Name: "another-sync-target", + UID: "another-sync-target-uid", + }, + ClusterName: logicalcluster.From(upstreamNs), + Namespace: upstreamNs.Name, + }) + require.NoError(t, err) + + // Get a more privileged client to be able to update namespaces. + downstreamAdminKubeClient, err := kubernetes.NewForConfig(syncerFixture.DownstreamConfig) + require.NoError(t, err) + + // Patch the namespace to update the locator. + namespacePatch, err := json.Marshal(map[string]interface{}{ + "metadata": map[string]interface{}{ + "annotations": map[string]interface{}{ + shared.NamespaceLocatorAnnotation: string(locatorJSON), + }, + }, + }) + require.NoError(t, err) + _, err = downstreamAdminKubeClient.CoreV1().Namespaces().Patch(ctx, downstreamNsName, types.MergePatchType, namespacePatch, metav1.PatchOptions{}) + require.NoError(t, err) + + // Let's try to get the pod logs again, this should fail, as the downstream Pod is not actually upsynced. + framework.Eventually(t, func() (bool, string) { + expectedError := fmt.Sprintf("unknown (get pods %s)", upsyncedPod.Name) + request := kcpKubeClusterClient.Cluster(userWsName.Path()).CoreV1().Pods(upstreamNs.Name).GetLogs(upsyncedPod.Name, &corev1.PodLogOptions{}) + _, err = request.Do(ctx).Raw() + if err != nil { + if err.Error() == expectedError { + return true, "" + } + return false, fmt.Sprintf("Returned error: %s is different from expected error: %s", err.Error(), expectedError) + } + return false, "no error returned from get logs" + }, wait.ForeverTestTimeout, time.Millisecond*100, "") +}