Skip to content

Commit

Permalink
Tunnel: Validate namespace/pod at the syncer side
Browse files Browse the repository at this point in the history
  • Loading branch information
jmprusi committed Feb 21, 2023
1 parent 4f71d22 commit fbdc37a
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 8 deletions.
78 changes: 77 additions & 1 deletion pkg/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package syncer
import (
"context"
"fmt"
"net/http"
"net/url"
"time"

Expand All @@ -27,6 +28,7 @@ import (

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -47,11 +49,13 @@ import (
kcpinformers "github.com/kcp-dev/kcp/pkg/client/informers/externalversions"
kcpfeatures "github.com/kcp-dev/kcp/pkg/features"
ddsif "github.com/kcp-dev/kcp/pkg/informer"
"github.com/kcp-dev/kcp/pkg/server/requestinfo"
"github.com/kcp-dev/kcp/pkg/syncer/controllermanager"
"github.com/kcp-dev/kcp/pkg/syncer/endpoints"
"github.com/kcp-dev/kcp/pkg/syncer/indexers"
"github.com/kcp-dev/kcp/pkg/syncer/namespace"
"github.com/kcp-dev/kcp/pkg/syncer/resourcesync"
"github.com/kcp-dev/kcp/pkg/syncer/shared"
"github.com/kcp-dev/kcp/pkg/syncer/spec"
"github.com/kcp-dev/kcp/pkg/syncer/status"
"github.com/kcp-dev/kcp/pkg/syncer/upsync"
Expand Down Expand Up @@ -407,7 +411,79 @@ func StartSyncer(ctx context.Context, cfg *SyncerConfig, numSyncerThreads int, i

// Start tunneler for POD access
if kcpfeatures.DefaultFeatureGate.Enabled(kcpfeatures.SyncerTunnel) {
go startSyncerTunnel(ctx, upstreamConfig, downstreamConfig, logicalcluster.From(syncTarget), cfg.SyncTargetName)
filter := func(http *http.Request) bool {
resolver := requestinfo.NewKCPRequestInfoResolver()
requestInfo, err := resolver.NewRequestInfo(http)
if err != nil {
logger.V(4).Error(err, "Failed to get request info")
return false
}

// Ensure that requests are only for pods, and we have the required information, if not, return false.
if requestInfo.Resource != "pods" || requestInfo.Subresource == "" || requestInfo.Name == "" || requestInfo.Namespace == "" {
return false
}
// Ensure that requests are only for pods in a namespace owned by this syncer, if not, return false.
downstreamNamespaceName := requestInfo.Namespace
logger = logger.WithValues("namespace", downstreamNamespaceName)

informers, _ := ddsifForDownstream.Informers()
nsInformer, ok := informers[schema.GroupVersionResource{Group: "", Version: "v1", Resource: "namespaces"}]
if !ok {
logger.V(4).Error(err, "Failed to get namespace informer")
return false
}

obj, err := nsInformer.Lister().Get(downstreamNamespaceName)
if err != nil {
logger.V(4).Error(err, "Failed to get namespace")
return false
}
nsUnstr, ok := obj.(*unstructured.Unstructured)
if !ok {
logger.V(4).Error(err, "Failed to convert object to unstructured.Unstructured")
return false
}

// If the namespace is not owned by this syncer, return false.s
if locator, ok, err := shared.LocatorFromAnnotations(nsUnstr.GetAnnotations()); ok {
if err != nil {
logger.V(4).Error(err, "Failed to get locator from namespace")
return false
}
if locator.SyncTarget.Name != cfg.SyncTargetName || string(locator.SyncTarget.UID) != cfg.SyncTargetUID || locator.SyncTarget.ClusterName != logicalcluster.From(syncTarget).String() {
return false
}
} else {
logger.V(4).Error(err, "Failed to get locator from namespace")
return false
}

// Ensure Pod is in Upsynced state.
podName := requestInfo.Name
podInformer, ok := informers[schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}]
if !ok {
logger.V(4).Error(err, "Failed to get pod informer")
return false
}
obj, err = podInformer.Lister().ByNamespace(downstreamNamespaceName).Get(podName)
if err != nil {
logger.V(4).Error(err, "Failed to get pod", "pod", podName)
return false
}
podUnstr, ok := obj.(*unstructured.Unstructured)
if !ok {
logger.V(4).Error(err, "Failed to convert object to unstructured.Unstructured")
return false
}
if podUnstr.GetLabels()[workloadv1alpha1.ClusterResourceStateLabelPrefix+syncTargetKey] != string(workloadv1alpha1.ResourceStateUpsync) {
return false
}

return true
}

go startSyncerTunnel(ctx, upstreamConfig, downstreamConfig, logicalcluster.From(syncTarget), cfg.SyncTargetName, filter)
}

StartHeartbeat(ctx, kcpSyncTargetClient, cfg.SyncTargetName, cfg.SyncTargetUID)
Expand Down
36 changes: 30 additions & 6 deletions pkg/syncer/tunneler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,23 @@ package syncer

import (
"context"
"fmt"
"net/http"
"net/http/httputil"
"net/url"
"time"

"github.com/kcp-dev/logicalcluster/v3"

"github.com/kcp-dev/kcp/pkg/tunneler"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
"k8s.io/utils/clock"

"github.com/kcp-dev/kcp/pkg/tunneler"
)

// startSyncerTunnel blocks until the context is cancelled trying to establish a tunnel against the specified target.
func startSyncerTunnel(ctx context.Context, upstream, downstream *rest.Config, syncTargetWorkspace logicalcluster.Name, syncTargetName string) {
func startSyncerTunnel(ctx context.Context, upstream, downstream *rest.Config, syncTargetWorkspace logicalcluster.Name, syncTargetName string, filter func(*http.Request) bool) {
// connect to create the reverse tunnels
var (
initBackoff = 5 * time.Second
Expand All @@ -51,14 +51,14 @@ func startSyncerTunnel(ctx context.Context, upstream, downstream *rest.Config, s

wait.BackoffUntil(func() {
logger.V(5).Info("starting tunnel")
err := startTunneler(ctx, upstream, downstream, syncTargetWorkspace, syncTargetName)
err := startTunneler(ctx, upstream, downstream, syncTargetWorkspace, syncTargetName, filter)
if err != nil {
logger.Error(err, "failed to create tunnel")
}
}, backoffMgr, sliding, ctx.Done())
}

func startTunneler(ctx context.Context, upstream, downstream *rest.Config, syncTargetClusterName logicalcluster.Name, syncTargetName string) error {
func startTunneler(ctx context.Context, upstream, downstream *rest.Config, syncTargetClusterName logicalcluster.Name, syncTargetName string, filter func(*http.Request) bool) error {
logger := klog.FromContext(ctx)

// syncer --> kcp
Expand All @@ -85,7 +85,12 @@ func startTunneler(ctx context.Context, upstream, downstream *rest.Config, syncT
if err != nil {
return err
}
proxy.Transport = clientDownstream.Transport

if filter == nil {
filter = func(*http.Request) bool { return true }
}

proxy.Transport = newFilteredRoundTripper(clientDownstream.Transport, filter)

// create the reverse connection
// virtual workspaces
Expand Down Expand Up @@ -126,3 +131,22 @@ func startTunneler(ctx context.Context, upstream, downstream *rest.Config, syncT
logger.V(2).Info("stop serving on reverse connection")
return err
}

func newFilteredRoundTripper(delegate http.RoundTripper, filter func(*http.Request) bool) http.RoundTripper {
return &filteredRoundTripper{
delegate: delegate,
filter: filter,
}
}

type filteredRoundTripper struct {
delegate http.RoundTripper
filter func(*http.Request) bool
}

func (f *filteredRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
if !f.filter(req) {
return nil, fmt.Errorf("request not allowed")
}
return f.delegate.RoundTrip(req)
}
1 change: 0 additions & 1 deletion pkg/tunneler/podsubresourceproxy_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,6 @@ func (tn *tunneler) Proxy(clusterName logicalcluster.Name, syncerName string, rw
}

proxy := httputil.NewSingleHostReverseProxy(target)
// director := proxy.Director
proxy.Transport = &http.Transport{
Proxy: nil, // no proxies
DialContext: d.Dial, // use a reverse connection
Expand Down
9 changes: 9 additions & 0 deletions test/e2e/syncer/tunnels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,3 +290,12 @@ func TestSyncerTunnel(t *testing.T) {
return podLogs.Len() > 1, podLogs.String()
}, wait.ForeverTestTimeout, time.Millisecond*100, "couldn't get downstream pod logs for deployment %s/%s", d.Namespace, d.Name)
}

// TestSyncerTunnelerCrafted checks that the syncer will refuse to accept a request against a pod or namespace that is not part of the syncer's target.
func TestSyncerTunnelerCrafted(t *testing.T) {
t.Parallel()
framework.Suite(t, "transparent-multi-cluster")

defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SyncerTunnel, true)()

}

0 comments on commit fbdc37a

Please sign in to comment.