Skip to content

Commit

Permalink
cmd/k8s-operator,k8s-operator/session-recording: implement support fo…
Browse files Browse the repository at this point in the history
…r WebSocket protocol

Kubernetes currently supports two streaming protocols- SPDY and
WebSockets. WebSockets are replacing SPDY, see
kubernetes/enhancements#4006
Our 'kubectl exec' session recording was only supporting
SPDY.

This change:

- adds functionality to parse streaming sessions over WebSockets

- for sessions that the API server proxy has determined need to be
recorded, determines if the session is over SPDY or WebSockets and
invoke the relevant parser accordingly

- refactors the session recording logic into its own package

Updates tailscale/corp#19821

Signed-off-by: Irbe Krumina <irbe@tailscale.com>
  • Loading branch information
irbekrm committed Jul 26, 2024
1 parent 8d7b78f commit cbb7560
Show file tree
Hide file tree
Showing 15 changed files with 1,178 additions and 238 deletions.
75 changes: 52 additions & 23 deletions cmd/k8s-operator/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"k8s.io/client-go/transport"
"tailscale.com/client/tailscale"
"tailscale.com/client/tailscale/apitype"
sessionrecording "tailscale.com/k8s-operator/session-recording"
tskube "tailscale.com/kube"
"tailscale.com/ssh/tailssh"
"tailscale.com/tailcfg"
Expand All @@ -36,12 +37,6 @@ var whoIsKey = ctxkey.New("", (*apitype.WhoIsResponse)(nil))
var (
// counterNumRequestsproxies counts the number of API server requests proxied via this proxy.
counterNumRequestsProxied = clientmetric.NewCounter("k8s_auth_proxy_requests_proxied")

// counterSessionRecordingsAttempted counts the number of session recording attempts.
counterSessionRecordingsAttempted = clientmetric.NewCounter("k8s_auth_proxy__session_recordings_attempted")

// counterSessionRecordingsUploaded counts the number of successfully uploaded session recordings.
counterSessionRecordingsUploaded = clientmetric.NewCounter("k8s_auth_proxy_session_recordings_uploaded")
)

type apiServerProxyMode int
Expand Down Expand Up @@ -173,7 +168,9 @@ func runAPIServerProxy(ts *tsnet.Server, rt http.RoundTripper, log *zap.SugaredL

mux := http.NewServeMux()
mux.HandleFunc("/", ap.serveDefault)
mux.HandleFunc("/api/v1/namespaces/{namespace}/pods/{pod}/exec", ap.serveExec)
mux.HandleFunc("POST /api/v1/namespaces/{namespace}/pods/{pod}/exec", ap.serveExecSPDY)

mux.HandleFunc("GET /api/v1/namespaces/{namespace}/pods/{pod}/exec", ap.serveExecWS)

hs := &http.Server{
// Kubernetes uses SPDY for exec and port-forward, however SPDY is
Expand Down Expand Up @@ -216,7 +213,7 @@ func (ap *apiserverProxy) serveDefault(w http.ResponseWriter, r *http.Request) {

// serveExec serves 'kubectl exec' requests, optionally configuring the kubectl
// exec sessions to be recorded.
func (ap *apiserverProxy) serveExec(w http.ResponseWriter, r *http.Request) {
func (ap *apiserverProxy) serveExecWS(w http.ResponseWriter, r *http.Request) {
who, err := ap.whoIs(r)
if err != nil {
ap.authError(w, err)
Expand All @@ -232,14 +229,58 @@ func (ap *apiserverProxy) serveExec(w http.ResponseWriter, r *http.Request) {
ap.rp.ServeHTTP(w, r.WithContext(whoIsKey.WithValue(r.Context(), who)))
return
}
counterSessionRecordingsAttempted.Add(1) // at this point we know that users intended for this session to be recorded
sessionrecording.CounterSessionRecordingsAttempted.Add(1) // at this point we know that users intended for this session to be recorded
if !failOpen && len(addrs) == 0 {
msg := "forbidden: 'kubectl exec' session must be recorded, but no recorders are available."
ap.log.Error(msg)
http.Error(w, msg, http.StatusForbidden)
return
}
if r.Method != "POST" || r.Header.Get("Upgrade") != "SPDY/3.1" {
if h := r.Header.Get("Upgrade"); h != "websocket" {
msg := fmt.Sprintf("[unexpected] 'kubectl exec' session was initiated for WebSocket protocol, but the request does not contain expected upgrade header, wants: 'websocket', got: %q", h)
if failOpen {
msg = msg + "; failure mode is 'fail open'; continuing session without recording."
ap.log.Warn(msg)
ap.rp.ServeHTTP(w, r.WithContext(whoIsKey.WithValue(r.Context(), who)))
return
}
ap.log.Error(msg)
msg += "; failure mode is 'fail closed'; closing connection."
http.Error(w, msg, 403)
return
} else {
ap.log.Infof("websocket upgrade")
}
wsH := sessionrecording.New(ap.ts, r, who, w, r.PathValue("pod"), r.PathValue("namespace"), sessionrecording.WebSocketsProtocol, addrs, failOpen, tailssh.ConnectToRecorder, ap.log)

ap.rp.ServeHTTP(wsH, r.WithContext(whoIsKey.WithValue(r.Context(), who)))
}

// serveExec serves 'kubectl exec' requests, optionally configuring the kubectl
// exec sessions to be recorded.
func (ap *apiserverProxy) serveExecSPDY(w http.ResponseWriter, r *http.Request) {
who, err := ap.whoIs(r)
if err != nil {
ap.authError(w, err)
return
}
counterNumRequestsProxied.Add(1)
failOpen, addrs, err := determineRecorderConfig(who)
if err != nil {
ap.log.Errorf("error trying to determine whether the 'kubectl exec' session needs to be recorded: %v", err)
return
}
if failOpen && len(addrs) == 0 { // will not record
ap.rp.ServeHTTP(w, r.WithContext(whoIsKey.WithValue(r.Context(), who)))
return
}
if !failOpen && len(addrs) == 0 {
msg := "forbidden: 'kubectl exec' session must be recorded, but no recorders are available."
ap.log.Error(msg)
http.Error(w, msg, 403)
return
}
if r.Header.Get("Upgrade") != "SPDY/3.1" {
msg := "'kubectl exec' session recording is configured, but the request is not over SPDY. Session recording is currently only supported for SPDY based clients"
if failOpen {
msg = msg + "; failure mode is 'fail open'; continuing session without recording."
Expand All @@ -252,19 +293,7 @@ func (ap *apiserverProxy) serveExec(w http.ResponseWriter, r *http.Request) {
http.Error(w, msg, http.StatusForbidden)
return
}
spdyH := &spdyHijacker{
ts: ap.ts,
req: r,
who: who,
ResponseWriter: w,
log: ap.log,
pod: r.PathValue("pod"),
ns: r.PathValue("namespace"),
addrs: addrs,
failOpen: failOpen,
connectToRecorder: tailssh.ConnectToRecorder,
}

spdyH := sessionrecording.New(ap.ts, r, who, w, r.PathValue("pod"), r.PathValue("namespace"), sessionrecording.SPDYProtocol, addrs, failOpen, tailssh.ConnectToRecorder, ap.log)
ap.rp.ServeHTTP(spdyH, r.WithContext(whoIsKey.WithValue(r.Context(), who)))
}

Expand Down
117 changes: 117 additions & 0 deletions k8s-operator/session-recording/fakes/fakes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause

//go:build !plan9

// package fakes contains utils for testing session recording behaviour.
package fakes

import (
"bytes"
"encoding/json"
"net"
"sync"
"testing"

"tailscale.com/k8s-operator/session-recording/tsrecorder"
"tailscale.com/tstime"
)

func New(conn net.Conn, wb bytes.Buffer, rb bytes.Buffer, closed bool) net.Conn {
return &TestConn{
Conn: conn,
writeBuf: wb,
readBuf: rb,
closed: closed,
}
}

type TestConn struct {
net.Conn
// writeBuf contains whatever was send to the conn via Write.
writeBuf bytes.Buffer
// readBuf contains whatever was sent to the conn via Read.
readBuf bytes.Buffer
sync.RWMutex // protects the following
closed bool
}

var _ net.Conn = &TestConn{}

func (tc *TestConn) Read(b []byte) (int, error) {
return tc.readBuf.Read(b)
}

func (tc *TestConn) Write(b []byte) (int, error) {
return tc.writeBuf.Write(b)
}

func (tc *TestConn) Close() error {
tc.Lock()
defer tc.Unlock()
tc.closed = true
return nil
}

func (tc *TestConn) IsClosed() bool {
tc.Lock()
defer tc.Unlock()
return tc.closed
}

func (tc *TestConn) WriteBufBytes() []byte {
return tc.writeBuf.Bytes()
}

func (tc *TestConn) ResetReadBuf() {
tc.readBuf.Reset()
}

func (tc *TestConn) WriteReadBufBytes(b []byte) error {
_, err := tc.readBuf.Write(b)
return err
}

type TestSessionRecorder struct {
// buf holds data that was sent to the session recorder.
buf bytes.Buffer
}

func (t *TestSessionRecorder) Write(b []byte) (int, error) {
return t.buf.Write(b)
}

func (t *TestSessionRecorder) Close() error {
t.buf.Reset()
return nil
}

func (t *TestSessionRecorder) Bytes() []byte {
return t.buf.Bytes()
}

func CastLine(t *testing.T, p []byte, clock tstime.Clock) []byte {
t.Helper()
j, err := json.Marshal([]any{
clock.Now().Sub(clock.Now()).Seconds(),
"o",
string(p),
})
if err != nil {
t.Fatalf("error marshalling cast line: %v", err)
}
return append(j, '\n')
}

func AsciinemaResizeMsg(t *testing.T, width, height int) []byte {
t.Helper()
ch := tsrecorder.CastHeader{
Width: width,
Height: height,
}
bs, err := json.Marshal(ch)
if err != nil {
t.Fatalf("error marshalling CastHeader: %v", err)
}
return append(bs, '\n')
}
Loading

0 comments on commit cbb7560

Please sign in to comment.