Skip to content

Commit

Permalink
Tunnel: Validate namespace/pod at the syncer side
Browse files Browse the repository at this point in the history
  • Loading branch information
jmprusi committed Feb 27, 2023
1 parent 4f71d22 commit 75a98a2
Show file tree
Hide file tree
Showing 5 changed files with 390 additions and 11 deletions.
10 changes: 9 additions & 1 deletion pkg/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,15 @@ func StartSyncer(ctx context.Context, cfg *SyncerConfig, numSyncerThreads int, i

// Start tunneler for POD access
if kcpfeatures.DefaultFeatureGate.Enabled(kcpfeatures.SyncerTunnel) {
go startSyncerTunnel(ctx, upstreamConfig, downstreamConfig, logicalcluster.From(syncTarget), cfg.SyncTargetName)
listerFunc := func(gvr schema.GroupVersionResource) (cache.GenericLister, error) {
informers, _ := ddsifForDownstream.Informers()
informer, ok := informers[gvr]
if !ok {
return nil, fmt.Errorf("failed to get informer for gvr: %s", gvr)
}
return informer.Lister(), nil
}
StartSyncerTunnel(ctx, upstreamConfig, downstreamConfig, logicalcluster.From(syncTarget), cfg.SyncTargetName, cfg.SyncTargetUID, listerFunc)
}

StartHeartbeat(ctx, kcpSyncTargetClient, cfg.SyncTargetName, cfg.SyncTargetUID)
Expand Down
165 changes: 159 additions & 6 deletions pkg/syncer/tunneler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,48 @@ package syncer

import (
"context"
"fmt"
"net/http"
"net/http/httputil"
"net/url"
"time"

"github.com/kcp-dev/logicalcluster/v3"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/utils/clock"

workloadv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/workload/v1alpha1"
"github.com/kcp-dev/kcp/pkg/server/requestinfo"
"github.com/kcp-dev/kcp/pkg/syncer/shared"
"github.com/kcp-dev/kcp/pkg/tunneler"
)

// startSyncerTunnel blocks until the context is cancelled trying to establish a tunnel against the specified target.
func startSyncerTunnel(ctx context.Context, upstream, downstream *rest.Config, syncTargetWorkspace logicalcluster.Name, syncTargetName string) {
var (
errorScheme = runtime.NewScheme()
errorCodecs = serializer.NewCodecFactory(errorScheme)
)

func init() {
errorScheme.AddUnversionedTypes(metav1.Unversioned,
&metav1.Status{},
)
}

type ListerFuncForResource func(gvr schema.GroupVersionResource) (cache.GenericLister, error)

// StartSyncerTunnel blocks until the context is cancelled trying to establish a tunnel against the specified target.
func StartSyncerTunnel(ctx context.Context, upstream, downstream *rest.Config, syncTargetWorkspace logicalcluster.Name, syncTargetName, syncTargetUID string, listerFunc ListerFuncForResource) {
// connect to create the reverse tunnels
var (
initBackoff = 5 * time.Second
Expand All @@ -49,16 +74,16 @@ func startSyncerTunnel(ctx context.Context, upstream, downstream *rest.Config, s
backoffMgr := wait.NewExponentialBackoffManager(initBackoff, maxBackoff, resetDuration, backoffFactor, jitter, clock)
logger := klog.FromContext(ctx)

wait.BackoffUntil(func() {
go wait.BackoffUntil(func() {
logger.V(5).Info("starting tunnel")
err := startTunneler(ctx, upstream, downstream, syncTargetWorkspace, syncTargetName)
err := startTunneler(ctx, upstream, downstream, syncTargetWorkspace, syncTargetName, syncTargetUID, listerFunc)
if err != nil {
logger.Error(err, "failed to create tunnel")
}
}, backoffMgr, sliding, ctx.Done())
}

func startTunneler(ctx context.Context, upstream, downstream *rest.Config, syncTargetClusterName logicalcluster.Name, syncTargetName string) error {
func startTunneler(ctx context.Context, upstream, downstream *rest.Config, syncTargetClusterName logicalcluster.Name, syncTargetName, syncTargetUID string, listerFunc ListerFuncForResource) error {
logger := klog.FromContext(ctx)

// syncer --> kcp
Expand All @@ -85,6 +110,7 @@ func startTunneler(ctx context.Context, upstream, downstream *rest.Config, syncT
if err != nil {
return err
}

proxy.Transport = clientDownstream.Transport

// create the reverse connection
Expand All @@ -109,7 +135,7 @@ func startTunneler(ctx context.Context, upstream, downstream *rest.Config, syncT
defer l.Close()

// reverse proxy the request coming from the reverse connection to the p-cluster apiserver
server := &http.Server{ReadHeaderTimeout: 30 * time.Second, Handler: proxy}
server := &http.Server{ReadHeaderTimeout: 30 * time.Second, Handler: proxyHandlerFiltered(proxy, listerFunc, syncTargetClusterName, syncTargetName, syncTargetUID)}
defer server.Close()

logger.V(2).Info("serving on reverse connection")
Expand All @@ -126,3 +152,130 @@ func startTunneler(ctx context.Context, upstream, downstream *rest.Config, syncT
logger.V(2).Info("stop serving on reverse connection")
return err
}

func proxyHandlerFiltered(proxy *httputil.ReverseProxy, listerFunc ListerFuncForResource, synctargetClusterName logicalcluster.Name, synctargetName, syncTargetUID string) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
resolver := requestinfo.NewKCPRequestInfoResolver()
requestInfo, err := resolver.NewRequestInfo(req)
if err != nil {
responsewriters.ErrorNegotiated(
errors.NewInternalError(fmt.Errorf("could not resolve RequestInfo: %w", err)),
errorCodecs, schema.GroupVersion{}, w, req,
)
return
}

namespaceGVR := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "namespaces"}
podGVR := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}

// Ensure that requests are only for pods, and we have the required information, if not, return false.
if requestInfo.Resource != "pods" || requestInfo.Subresource == "" || requestInfo.Name == "" || requestInfo.Namespace == "" {
responsewriters.ErrorNegotiated(
errors.NewForbidden(podGVR.GroupResource(), requestInfo.Name, fmt.Errorf("only pod subresources are allowed")),
errorCodecs, schema.GroupVersion{}, w, req,
)
return
}
// Ensure that requests are only for pods in a namespace owned by this syncer, if not, return false.
downstreamNamespaceName := requestInfo.Namespace

nsInformer, err := listerFunc(namespaceGVR)
if err != nil {
responsewriters.ErrorNegotiated(
errors.NewInternalError(fmt.Errorf("error while getting namespace lister: %w", err)),
errorCodecs, schema.GroupVersion{}, w, req,
)
return
}

obj, err := nsInformer.Get(downstreamNamespaceName)
if errors.IsNotFound(err) {
responsewriters.ErrorNegotiated(
errors.NewForbidden(namespaceGVR.GroupResource(), downstreamNamespaceName, fmt.Errorf("namespace %s does not exist", downstreamNamespaceName)),
errorCodecs, schema.GroupVersion{}, w, req,
)
return
}
if err != nil {
responsewriters.ErrorNegotiated(
errors.NewInternalError(fmt.Errorf("error while getting namespace resource: %w", err)),
errorCodecs, schema.GroupVersion{}, w, req,
)
return
}

downstreamNs, ok := obj.(*corev1.Namespace)
if !ok {
responsewriters.ErrorNegotiated(
errors.NewInternalError(fmt.Errorf("error while converting namespace resource to corev1.Namespace: %w", err)),
errorCodecs, schema.GroupVersion{}, w, req,
)
return
}
// Ensure the referenced downstream namespace locator is correct and owned by this syncer.
if locator, ok, err := shared.LocatorFromAnnotations(downstreamNs.GetAnnotations()); ok {
if err != nil {
responsewriters.ErrorNegotiated(
errors.NewInternalError(fmt.Errorf("error while getting locator from namespace: %w", err)),
errorCodecs, schema.GroupVersion{}, w, req,
)
return
}
if locator.SyncTarget.Name != synctargetName || string(locator.SyncTarget.UID) != syncTargetUID || locator.SyncTarget.ClusterName != string(synctargetClusterName) {
responsewriters.ErrorNegotiated(
errors.NewForbidden(namespaceGVR.GroupResource(), downstreamNamespaceName, fmt.Errorf("namespace %s is not owned by this syncer", downstreamNamespaceName)),
errorCodecs, schema.GroupVersion{}, w, req,
)
return
}
} else {
responsewriters.ErrorNegotiated(
errors.NewInternalError(fmt.Errorf("error while getting locator from namespace: %w", err)),
errorCodecs, schema.GroupVersion{}, w, req,
)
return
}

// Ensure Pod is in Upsynced state.
podName := requestInfo.Name
podInformer, err := listerFunc(podGVR)
if err != nil {
responsewriters.ErrorNegotiated(
errors.NewInternalError(fmt.Errorf("error while getting pod lister: %w", err)),
errorCodecs, schema.GroupVersion{}, w, req,
)
return
}
obj, err = podInformer.ByNamespace(downstreamNamespaceName).Get(podName)
if errors.IsNotFound(err) {
responsewriters.ErrorNegotiated(
errors.NewForbidden(podGVR.GroupResource(), podName, fmt.Errorf("pod %s does not exist", podName)),
errorCodecs, schema.GroupVersion{}, w, req,
)
return
}
if err != nil {
responsewriters.ErrorNegotiated(
errors.NewInternalError(fmt.Errorf("error while getting pod resource: %w", err)),
errorCodecs, schema.GroupVersion{}, w, req,
)
return
}
downstreamPod, ok := obj.(*corev1.Pod)
if !ok {
responsewriters.ErrorNegotiated(
errors.NewInternalError(fmt.Errorf("error while converting pod resource to unstructured: %w", err)),
errorCodecs, schema.GroupVersion{}, w, req,
)
return
}
if downstreamPod.GetLabels()[workloadv1alpha1.ClusterResourceStateLabelPrefix+workloadv1alpha1.ToSyncTargetKey(synctargetClusterName, synctargetName)] != string(workloadv1alpha1.ResourceStateUpsync) {
responsewriters.ErrorNegotiated(
errors.NewForbidden(podGVR.GroupResource(), podName, fmt.Errorf("pod %s is not in upsynced state", podName)),
errorCodecs, schema.GroupVersion{}, w, req,
)
return
}
proxy.ServeHTTP(w, req)
}
}
1 change: 0 additions & 1 deletion pkg/tunneler/podsubresourceproxy_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,6 @@ func (tn *tunneler) Proxy(clusterName logicalcluster.Name, syncerName string, rw
}

proxy := httputil.NewSingleHostReverseProxy(target)
// director := proxy.Director
proxy.Transport = &http.Transport{
Proxy: nil, // no proxies
DialContext: d.Dial, // use a reverse connection
Expand Down
55 changes: 54 additions & 1 deletion test/e2e/framework/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/dynamic"
kubernetesinformers "k8s.io/client-go/informers"
kubernetesclient "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -558,7 +560,58 @@ type appliedSyncerFixture struct {
SyncerVirtualWorkspaceConfig *rest.Config
UpsyncerVirtualWorkspaceConfig *rest.Config

stopHeartBeat context.CancelFunc
stopHeartBeat context.CancelFunc
stopSyncerTunnel context.CancelFunc
}

func (sf *appliedSyncerFixture) StartSyncerTunnel(t *testing.T) *StartedSyncerFixture {
t.Helper()
ctx, cancelFunc := context.WithCancel(context.Background())
t.Cleanup(cancelFunc)
sf.stopSyncerTunnel = cancelFunc

downstreamClient, err := kubernetesclient.NewForConfig(sf.SyncerConfig.DownstreamConfig)
require.NoError(t, err)

downstreamInformer := kubernetesinformers.NewSharedInformerFactoryWithOptions(downstreamClient, 10*time.Hour, kubernetesinformers.WithTweakListOptions(
func(options *metav1.ListOptions) {
options.LabelSelector = workloadv1alpha1.InternalDownstreamClusterLabel + "=" + workloadv1alpha1.ToSyncTargetKey(sf.SyncTargetClusterName, sf.SyncerConfig.SyncTargetName)
},
))

podGvr := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}
namespaceGvr := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "namespaces"}
informers := make(map[schema.GroupVersionResource]kubernetesinformers.GenericInformer)

informers[podGvr], err = downstreamInformer.ForResource(podGvr)
require.NoError(t, err)
informers[namespaceGvr], err = downstreamInformer.ForResource(namespaceGvr)
require.NoError(t, err)

downstreamInformer.Start(ctx.Done())
downstreamInformer.WaitForCacheSync(ctx.Done())

listerFunc := func(gvr schema.GroupVersionResource) (cache.GenericLister, error) {
if _, ok := informers[gvr]; !ok {
return nil, fmt.Errorf("no informer for %v", gvr)
}
return informers[gvr].Lister(), nil
}

syncer.StartSyncerTunnel(ctx, sf.SyncerConfig.UpstreamConfig, sf.SyncerConfig.DownstreamConfig, sf.SyncTargetClusterName, sf.SyncerConfig.SyncTargetName, sf.SyncerConfig.SyncTargetUID, listerFunc)
startedSyncer := &StartedSyncerFixture{
sf,
}

return startedSyncer
}

// StopSyncerTunnel stops the syncer tunnel, the syncer will close the reverse connection and
// pod subresources will not be available anymore.
func (sf *StartedSyncerFixture) StopSyncerTunnel(t *testing.T) {
t.Helper()

sf.stopSyncerTunnel()
}

// StartHeartBeat starts the Heartbeat keeper to maintain
Expand Down
Loading

0 comments on commit 75a98a2

Please sign in to comment.