Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Support for pod logs and other subresources #2401

Merged
6 changes: 6 additions & 0 deletions pkg/cliplugins/workload/plugin/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,12 @@ func (o *SyncOptions) enableSyncerForWorkspace(ctx context.Context, config *rest
ResourceNames: []string{syncTargetName},
Resources: []string{"synctargets"},
},
{
Verbs: []string{"get"},
APIGroups: []string{workloadv1alpha1.SchemeGroupVersion.Group},
ResourceNames: []string{syncTargetName},
Resources: []string{"synctargets/tunnel"},
},
{
Verbs: []string{"get", "list", "watch"},
APIGroups: []string{workloadv1alpha1.SchemeGroupVersion.Group},
Expand Down
14 changes: 10 additions & 4 deletions pkg/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,16 @@ func NewConfig(opts *kcpserveroptions.CompletedOptions) (*Config, error) {
apiHandler = WithRequestIdentity(apiHandler)
apiHandler = authorization.WithDeepSubjectAccessReview(apiHandler)

if kcpfeatures.DefaultFeatureGate.Enabled(kcpfeatures.SyncerTunnel) {
tunneler := tunneler.NewTunneler()
apiHandler = tunneler.WithSyncerTunnelHandler(apiHandler)
apiHandler = tunneler.WithPodSubresourceProxying(
apiHandler,
c.DynamicClusterClient,
c.KcpSharedInformerFactory,
)
}

// The following ensures that only the default main api handler chain executes authorizers which log audit messages.
// All other invocations of the same authorizer chain still work but do not produce audit log entries.
// This compromises audit log size and information overflow vs. having audit reasons for the main api handler only.
Expand Down Expand Up @@ -425,10 +435,6 @@ func NewConfig(opts *kcpserveroptions.CompletedOptions) (*Config, error) {
*c.preHandlerChainMux = append(*c.preHandlerChainMux, mux)
apiHandler = mux

if kcpfeatures.DefaultFeatureGate.Enabled(kcpfeatures.SyncerTunnel) {
apiHandler = tunneler.WithSyncerTunnel(apiHandler)
}

apiHandler = kcpfilters.WithAuditEventClusterAnnotation(apiHandler)
apiHandler = WithAuditAnnotation(apiHandler) // Must run before any audit annotation is made
apiHandler = WithLocalProxy(apiHandler, opts.Extra.ShardName, opts.Extra.ShardBaseURL, c.KcpSharedInformerFactory.Tenancy().V1alpha1().Workspaces(), c.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters())
Expand Down
111 changes: 76 additions & 35 deletions pkg/tunneler/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package tunneler

import (
"bytes"
"fmt"
"io"
"net/http"
Expand All @@ -26,9 +27,37 @@ import (
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"

genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
)

func setup(t *testing.T) (*http.Client, string, func()) {
// requestInfoHandler is a helping function to populate the requestInfo of a request as expected
// by the WithSyncerTunnelHandler.
func requestInfoHandler(handler http.Handler) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
if _, ok := genericapirequest.RequestInfoFrom(ctx); ok {
handler.ServeHTTP(w, r)
return
}
r = r.WithContext(genericapirequest.WithRequestInfo(ctx,
&genericapirequest.RequestInfo{
IsResourceRequest: true,
APIGroup: "workload.kcp.io",
APIVersion: "v1alpha1",
Resource: "synctargets",
Subresource: "tunnel",
Name: "d001",
},
))
r = r.WithContext(genericapirequest.WithCluster(r.Context(), genericapirequest.Cluster{Name: "ws"}))
handler.ServeHTTP(w, r)
})
}

func setup(t *testing.T) (string, *tunneler, func()) {
t.Helper()
backend := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "Hello world")
Expand All @@ -38,17 +67,19 @@ func setup(t *testing.T) (*http.Client, string, func()) {

// public server
mux := http.NewServeMux()
apiHandler := WithSyncerTunnel(mux)
tunneler := NewTunneler()
apiHandler := tunneler.WithSyncerTunnelHandler(mux)
apiHandler = requestInfoHandler(apiHandler)
publicServer := httptest.NewUnstartedServer(apiHandler)
publicServer.EnableHTTP2 = true
publicServer.StartTLS()

// private server
dstUrl, err := SyncerTunnelURL(publicServer.URL, "ws", "d001")
dstURL, err := SyncerTunnelURL(publicServer.URL, "ws", "d001")
if err != nil {
t.Fatal(err)
}
l, err := NewListener(publicServer.Client(), dstUrl)
l, err := NewListener(publicServer.Client(), dstURL)
if err != nil {
t.Fatal(err)
}
Expand All @@ -73,22 +104,25 @@ func setup(t *testing.T) (*http.Client, string, func()) {
publicServer.Close()
backend.Close()
}
return publicServer.Client(), dstUrl + "/" + cmdTunnelProxy + "/", stop
return dstURL, tunneler, stop
}

func Test_integration(t *testing.T) {
client, uri, stop := setup(t)
resp, err := client.Get(uri) //nolint:noctx
if err != nil {
t.Fatalf("Request Failed: %s", err)
}
defer resp.Body.Close()
uri, tunneler, stop := setup(t)
rw := httptest.NewRecorder()
b := &bytes.Buffer{}
req, err := http.NewRequest(http.MethodGet, uri, b) //nolint:noctx
require.NoError(t, err)
tunneler.Proxy("ws", "d001", rw, req)
defer stop()

body, err := io.ReadAll(resp.Body)
response := rw.Result()
body, err := io.ReadAll(response.Body)
defer response.Body.Close()
if err != nil {
t.Fatalf("Reading body failed: %s", err)
}

// Log the request body
bodyString := string(body)
if bodyString != "Hello world" {
Expand All @@ -97,23 +131,26 @@ func Test_integration(t *testing.T) {
}

func Test_integration_multiple_connections(t *testing.T) {
client, uri, stop := setup(t)
uri, tunneler, stop := setup(t)
defer stop()
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
resp, err := client.Get(uri) //nolint:noctx
if err != nil {
t.Errorf("Request Failed: %s", err)
}
defer resp.Body.Close()
rw := httptest.NewRecorder()
b := &bytes.Buffer{}
req, err := http.NewRequest(http.MethodGet, uri, b) //nolint:noctx
require.NoError(t, err)
tunneler.Proxy("ws", "d001", rw, req)

body, err := io.ReadAll(resp.Body)
response := rw.Result()
body, err := io.ReadAll(response.Body)
defer response.Body.Close()
if err != nil {
t.Errorf("Reading body failed: %s", err)
}

// Log the request body
bodyString := string(body)
if bodyString != "Hello world" {
Expand All @@ -134,7 +171,9 @@ func Test_integration_listener_reconnect(t *testing.T) {

// public server
mux := http.NewServeMux()
apiHandler := WithSyncerTunnel(mux)
tunneler := NewTunneler()
apiHandler := tunneler.WithSyncerTunnelHandler(mux)
apiHandler = requestInfoHandler(apiHandler)
publicServer := httptest.NewUnstartedServer(apiHandler)
publicServer.EnableHTTP2 = true
publicServer.StartTLS()
Expand Down Expand Up @@ -166,16 +205,15 @@ func Test_integration_listener_reconnect(t *testing.T) {
// wait for the reverse connection to be established
time.Sleep(1 * time.Second)

client := publicServer.Client()
uri := dstURL + "/" + cmdTunnelProxy + "/"
rw := httptest.NewRecorder()
b := &bytes.Buffer{}
req, err := http.NewRequest(http.MethodGet, dstURL, b) //nolint:noctx
require.NoError(t, err)
tunneler.Proxy("ws", "d001", rw, req)

resp, err := client.Get(uri) //nolint:noctx
if err != nil {
t.Fatalf("Request Failed: %s", err)
}
defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
response := rw.Result()
body, err := io.ReadAll(response.Body)
defer response.Body.Close()
if err != nil {
t.Fatalf("Reading body failed: %s", err)
}
Expand All @@ -201,16 +239,19 @@ func Test_integration_listener_reconnect(t *testing.T) {
go server2.Serve(l2)
defer server2.Close()

resp, err = client.Get(uri) //nolint:noctx
if err != nil {
t.Fatalf("Request Failed: %s", err)
}
defer resp.Body.Close()
rw2 := httptest.NewRecorder()
b2 := &bytes.Buffer{}
req2, err := http.NewRequest(http.MethodGet, dstURL, b2) //nolint:noctx
require.NoError(t, err)
tunneler.Proxy("ws", "d001", rw2, req2)

body, err = io.ReadAll(resp.Body)
response = rw2.Result()
body, err = io.ReadAll(response.Body)
defer response.Body.Close()
if err != nil {
t.Fatalf("Reading body failed: %s", err)
}

// Log the request body
bodyString = string(body)
if bodyString != "Hello world" {
Expand Down
2 changes: 1 addition & 1 deletion pkg/tunneler/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (ln *Listener) sendMessage(m controlMsg) {
}

func (ln *Listener) dial() (net.Conn, error) {
connect := ln.url + "/" + cmdTunnelConnect
connect := ln.url + "/" + tunnelSubresourcePath
pr, pw := io.Pipe()
req, err := http.NewRequest(http.MethodGet, connect, pr) //nolint:noctx
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/tunneler/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func Test_SyncerTunnelURL(t *testing.T) {
host: "https://host:9443/base",
ws: "myws",
target: "syncer001",
want: "https://host:9443/base/services/syncer-tunnels/clusters/myws/apis/workload.kcp.io/v1alpha1/synctargets/syncer001",
want: "https://host:9443/base/clusters/myws/apis/workload.kcp.io/v1alpha1/synctargets/syncer001",
},
{
name: "invalid host scheme",
Expand Down
Loading