Skip to content

Commit

Permalink
Merge pull request #2819 from jmprusi/1975-implement-pod-access-autho…
Browse files Browse the repository at this point in the history
…rization-in-the-syncer-itself

🌱  Tunnel: Validate namespace/pod at the syncer side
  • Loading branch information
openshift-merge-robot authored Mar 8, 2023
2 parents af18808 + ec35135 commit 85793f9
Show file tree
Hide file tree
Showing 6 changed files with 588 additions and 11 deletions.
9 changes: 8 additions & 1 deletion pkg/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,14 @@ func StartSyncer(ctx context.Context, cfg *SyncerConfig, numSyncerThreads int, i

// Start tunneler for POD access
if kcpfeatures.DefaultFeatureGate.Enabled(kcpfeatures.SyncerTunnel) {
go startSyncerTunnel(ctx, upstreamConfig, downstreamConfig, logicalcluster.From(syncTarget), cfg.SyncTargetName)
StartSyncerTunnel(ctx, upstreamConfig, downstreamConfig, logicalcluster.From(syncTarget), cfg.SyncTargetName, cfg.SyncTargetUID, func(gvr schema.GroupVersionResource) (cache.GenericLister, error) {
informers, _ := ddsifForDownstream.Informers()
informer, ok := informers[gvr]
if !ok {
return nil, fmt.Errorf("failed to get informer for gvr: %s", gvr)
}
return informer.Lister(), nil
})
}

StartHeartbeat(ctx, kcpSyncTargetClient, cfg.SyncTargetName, cfg.SyncTargetUID)
Expand Down
148 changes: 142 additions & 6 deletions pkg/syncer/tunneler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,48 @@ package syncer

import (
"context"
"fmt"
"net/http"
"net/http/httputil"
"net/url"
"time"

"github.com/kcp-dev/logicalcluster/v3"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/utils/clock"

workloadv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/workload/v1alpha1"
"github.com/kcp-dev/kcp/pkg/server/requestinfo"
"github.com/kcp-dev/kcp/pkg/syncer/shared"
"github.com/kcp-dev/kcp/pkg/tunneler"
)

// startSyncerTunnel blocks until the context is cancelled trying to establish a tunnel against the specified target.
func startSyncerTunnel(ctx context.Context, upstream, downstream *rest.Config, syncTargetWorkspace logicalcluster.Name, syncTargetName string) {
var (
errorScheme = runtime.NewScheme()
errorCodecs = serializer.NewCodecFactory(errorScheme)
)

func init() {
errorScheme.AddUnversionedTypes(metav1.Unversioned,
&metav1.Status{},
)
}

type ResourceListerFunc func(gvr schema.GroupVersionResource) (cache.GenericLister, error)

// StartSyncerTunnel blocks until the context is cancelled trying to establish a tunnel against the specified target.
func StartSyncerTunnel(ctx context.Context, upstream, downstream *rest.Config, syncTargetWorkspace logicalcluster.Name, syncTargetName, syncTargetUID string, getDownstreamLister ResourceListerFunc) {
// connect to create the reverse tunnels
var (
initBackoff = 5 * time.Second
Expand All @@ -49,16 +74,16 @@ func startSyncerTunnel(ctx context.Context, upstream, downstream *rest.Config, s
backoffMgr := wait.NewExponentialBackoffManager(initBackoff, maxBackoff, resetDuration, backoffFactor, jitter, clock)
logger := klog.FromContext(ctx)

wait.BackoffUntil(func() {
go wait.BackoffUntil(func() {
logger.V(5).Info("starting tunnel")
err := startTunneler(ctx, upstream, downstream, syncTargetWorkspace, syncTargetName)
err := startTunneler(ctx, upstream, downstream, syncTargetWorkspace, syncTargetName, syncTargetUID, getDownstreamLister)
if err != nil {
logger.Error(err, "failed to create tunnel")
}
}, backoffMgr, sliding, ctx.Done())
}

func startTunneler(ctx context.Context, upstream, downstream *rest.Config, syncTargetClusterName logicalcluster.Name, syncTargetName string) error {
func startTunneler(ctx context.Context, upstream, downstream *rest.Config, syncTargetClusterName logicalcluster.Name, syncTargetName, syncTargetUID string, getDownstreamLister ResourceListerFunc) error {
logger := klog.FromContext(ctx)

// syncer --> kcp
Expand All @@ -85,6 +110,7 @@ func startTunneler(ctx context.Context, upstream, downstream *rest.Config, syncT
if err != nil {
return err
}

proxy.Transport = clientDownstream.Transport

// create the reverse connection
Expand All @@ -109,7 +135,7 @@ func startTunneler(ctx context.Context, upstream, downstream *rest.Config, syncT
defer l.Close()

// reverse proxy the request coming from the reverse connection to the p-cluster apiserver
server := &http.Server{ReadHeaderTimeout: 30 * time.Second, Handler: proxy}
server := &http.Server{ReadHeaderTimeout: 30 * time.Second, Handler: withPodAccessCheck(proxy, getDownstreamLister, syncTargetClusterName, syncTargetName, syncTargetUID)}
defer server.Close()

logger.V(2).Info("serving on reverse connection")
Expand All @@ -126,3 +152,113 @@ func startTunneler(ctx context.Context, upstream, downstream *rest.Config, syncT
logger.V(2).Info("stop serving on reverse connection")
return err
}

func withPodAccessCheck(handler http.Handler, getDownstreamLister ResourceListerFunc, synctargetClusterName logicalcluster.Name, synctargetName, syncTargetUID string) http.HandlerFunc {
namespaceGVR := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "namespaces"}
podGVR := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}

return func(w http.ResponseWriter, req *http.Request) {
resolver := requestinfo.NewKCPRequestInfoResolver()
requestInfo, err := resolver.NewRequestInfo(req)
if err != nil {
responsewriters.ErrorNegotiated(
errors.NewInternalError(fmt.Errorf("could not resolve RequestInfo: %w", err)),
errorCodecs, schema.GroupVersion{}, w, req,
)
return
}

// Ensure that requests are only for pods, and we have the required information, if not, return false.
if requestInfo.Resource != "pods" || requestInfo.Subresource == "" || requestInfo.Name == "" || requestInfo.Namespace == "" {
responsewriters.ErrorNegotiated(
errors.NewForbidden(podGVR.GroupResource(), requestInfo.Name, fmt.Errorf("invalid resource and/or subresource")),
errorCodecs, schema.GroupVersion{}, w, req,
)
return
}

// Ensure that requests are only for pods in a namespace owned by this syncer, if not, return false.
downstreamNamespaceName := requestInfo.Namespace

nsInformer, err := getDownstreamLister(namespaceGVR)
if err != nil {
responsewriters.ErrorNegotiated(
errors.NewInternalError(fmt.Errorf("error while getting downstream namespace lister: %w", err)),
errorCodecs, schema.GroupVersion{}, w, req,
)
return
}

obj, err := nsInformer.Get(downstreamNamespaceName)
if err != nil {
responsewriters.ErrorNegotiated(
errors.NewForbidden(namespaceGVR.GroupResource(), requestInfo.Namespace, fmt.Errorf("forbidden")),
errorCodecs, schema.GroupVersion{}, w, req,
)
return
}

downstreamNs, ok := obj.(*unstructured.Unstructured)
if !ok {
responsewriters.ErrorNegotiated(
errors.NewInternalError(fmt.Errorf("namespace resource should be *unstructured.Unstructured but was: %T", obj)),
errorCodecs, schema.GroupVersion{}, w, req,
)
return
}

// Ensure the referenced downstream namespace locator is correct and owned by this syncer.
annotations := downstreamNs.GetAnnotations()
if locator, ok, err := shared.LocatorFromAnnotations(annotations); ok {
if err != nil || locator.SyncTarget.Name != synctargetName || string(locator.SyncTarget.UID) != syncTargetUID || locator.SyncTarget.ClusterName != string(synctargetClusterName) {
responsewriters.ErrorNegotiated(
errors.NewForbidden(podGVR.GroupResource(), requestInfo.Name, fmt.Errorf("forbidden")),
errorCodecs, schema.GroupVersion{}, w, req,
)
return
}
} else {
responsewriters.ErrorNegotiated(
errors.NewForbidden(podGVR.GroupResource(), requestInfo.Name, fmt.Errorf("forbidden")),
errorCodecs, schema.GroupVersion{}, w, req,
)
return
}

// Ensure Pod is in Upsynced state.
podName := requestInfo.Name
podInformer, err := getDownstreamLister(podGVR)
if err != nil {
responsewriters.ErrorNegotiated(
errors.NewInternalError(fmt.Errorf("error while getting pod lister: %w", err)),
errorCodecs, schema.GroupVersion{}, w, req,
)
return
}
obj, err = podInformer.ByNamespace(downstreamNamespaceName).Get(podName)
if err != nil {
responsewriters.ErrorNegotiated(
errors.NewForbidden(podGVR.GroupResource(), requestInfo.Name, fmt.Errorf("forbidden")),
errorCodecs, schema.GroupVersion{}, w, req,
)
return
}
if downstreamPod, ok := obj.(*unstructured.Unstructured); ok {
if downstreamPod.GetLabels()[workloadv1alpha1.ClusterResourceStateLabelPrefix+workloadv1alpha1.ToSyncTargetKey(synctargetClusterName, synctargetName)] != string(workloadv1alpha1.ResourceStateUpsync) {
responsewriters.ErrorNegotiated(
errors.NewForbidden(podGVR.GroupResource(), requestInfo.Name, fmt.Errorf("forbidden")),
errorCodecs, schema.GroupVersion{}, w, req,
)
return
}
} else {
responsewriters.ErrorNegotiated(
errors.NewInternalError(fmt.Errorf("pod resource should be *unstructured.Unstructured but was: %T", obj)),
errorCodecs, schema.GroupVersion{}, w, req,
)
return
}

handler.ServeHTTP(w, req)
}
}
Loading

0 comments on commit 85793f9

Please sign in to comment.