Skip to content

Commit

Permalink
[v14] Use the correct error when inspecting Kubernetes session (#33950)
Browse files Browse the repository at this point in the history
* Use the correct error when inspecting Kubernetes session

This PR fixes the audit log report for Kubernetes sessions that returned
errors. The previous functions was using the incorrect error variable.

Signed-off-by: Tiago Silva <tiago.silva@goteleport.com>

* append error to the session recorder

* correct session leave code

* avoid emisleading debug error when owner joins the session

---------

Signed-off-by: Tiago Silva <tiago.silva@goteleport.com>
  • Loading branch information
tigrato authored Oct 27, 2023
1 parent a530389 commit 9b38f82
Show file tree
Hide file tree
Showing 4 changed files with 200 additions and 28 deletions.
9 changes: 7 additions & 2 deletions lib/kube/proxy/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1205,7 +1205,7 @@ func (f *Forwarder) join(ctx *authContext, w http.ResponseWriter, req *http.Requ
client := &websocketClientStreams{stream}
party := newParty(*ctx, stream.Mode, client)

err = session.join(party)
err = session.join(party, true /* emitSessionJoinEvent */)
if err != nil {
return trace.Wrap(err)
}
Expand Down Expand Up @@ -1581,6 +1581,9 @@ func exitCode(err error) (errMsg, code string) {
return
}
errMsg = kubeStatusErr.ErrStatus.Message
if errMsg == "" {
errMsg = string(kubeStatusErr.ErrStatus.Reason)
}
code = strconv.Itoa(int(kubeStatusErr.ErrStatus.Code))
} else if errors.As(err, &kubeExecErr) {
if kubeExecErr.Err != nil {
Expand Down Expand Up @@ -1691,7 +1694,9 @@ func (f *Forwarder) exec(authCtx *authContext, w http.ResponseWriter, req *http.
}

f.setSession(session.id, session)
err = session.join(party)
// When Teleport attaches the original session creator terminal streams to the
// session, we don't wan't to emmit session.join event since it won't be required.
err = session.join(party, false /* emitSessionJoinEvent */)
if err != nil {
// This error must be forwarded to SPDY error stream, otherwise the client
// will hang waiting for the response.
Expand Down
77 changes: 52 additions & 25 deletions lib/kube/proxy/sess.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,12 +610,24 @@ func (s *session) launch() error {

s.io.On()
if err = executor.StreamWithContext(s.streamContext, options); err != nil {
s.reportErrorToSessionRecorder(err)
s.log.WithError(err).Warning("Executor failed while streaming.")
return trace.Wrap(err)
}
return nil
}

// reportErrorToSessionRecorder reports the error to the session recorder
// if it is set.
func (s *session) reportErrorToSessionRecorder(err error) {
if err == nil {
return
}
if s.recorder != nil {
fmt.Fprintf(s.recorder, "\n---\nSession exited with error: %v\n", err)
}
}

func (s *session) lockedSetupLaunch(request *remoteCommandRequest, q url.Values, eventPodMeta apievents.KubernetesPodMetadata) (func(error), error) {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down Expand Up @@ -773,7 +785,7 @@ func (s *session) lockedSetupLaunch(request *remoteCommandRequest, q url.Values,

if errExec != nil {
execEvent.Code = events.ExecFailureCode
execEvent.Error, execEvent.ExitCode = exitCode(err)
execEvent.Error, execEvent.ExitCode = exitCode(errExec)
}

if err := s.emitter.EmitAuditEvent(s.forwarder.ctx, execEvent); err != nil {
Expand Down Expand Up @@ -833,7 +845,7 @@ func (s *session) lockedSetupLaunch(request *remoteCommandRequest, q url.Values,
}

// join attempts to connect a party to the session.
func (s *session) join(p *party) error {
func (s *session) join(p *party, emitJoinEvent bool) error {
if p.Ctx.User.GetName() != s.ctx.User.GetName() {
roles := p.Ctx.Checker.Roles()

Expand Down Expand Up @@ -863,27 +875,11 @@ func (s *session) join(p *party) error {
return trace.Wrap(err)
}

sessionJoinEvent := &apievents.SessionJoin{
Metadata: apievents.Metadata{
Type: events.SessionJoinEvent,
Code: events.SessionJoinCode,
ClusterName: s.ctx.teleportCluster.name,
},
KubernetesClusterMetadata: apievents.KubernetesClusterMetadata{
KubernetesCluster: s.ctx.kubeClusterName,
KubernetesUsers: []string{},
KubernetesGroups: []string{},
KubernetesLabels: s.ctx.kubeClusterLabels,
},
SessionMetadata: s.getSessionMetadata(),
UserMetadata: p.Ctx.eventUserMetaWithLogin("root"),
ConnectionMetadata: apievents.ConnectionMetadata{
RemoteAddr: s.params.ByName("podName"),
},
}

if err := s.emitter.EmitAuditEvent(s.forwarder.ctx, sessionJoinEvent); err != nil {
s.forwarder.log.WithError(err).Warn("Failed to emit event.")
// we only want to emit the session.join when someone tries to join a session via
// tsh kube join and not when the original session owner terminal streams are
// connected to the Kubernetes session.
if emitJoinEvent {
s.emitSessionJoinEvent(p)
}

recentWrites := s.io.GetRecentHistory()
Expand Down Expand Up @@ -972,6 +968,37 @@ func (s *session) BroadcastMessage(format string, args ...any) {
}
}

// emitSessionJoinEvent emits a session.join audit event when a user joins
// the session.
// This function requires that the session must be active, otherwise audit logger
// will discard the event.
func (s *session) emitSessionJoinEvent(p *party) {
sessionJoinEvent := &apievents.SessionJoin{
Metadata: apievents.Metadata{
Type: events.SessionJoinEvent,
Code: events.SessionJoinCode,
ClusterName: s.ctx.teleportCluster.name,
},
KubernetesClusterMetadata: apievents.KubernetesClusterMetadata{
KubernetesCluster: s.ctx.kubeClusterName,
// joining moderators, obervers and peers don't have any
// kubernetes metadata configured.
KubernetesUsers: []string{},
KubernetesGroups: []string{},
KubernetesLabels: s.ctx.kubeClusterLabels,
},
SessionMetadata: s.getSessionMetadata(),
UserMetadata: p.Ctx.eventUserMetaWithLogin("root"),
ConnectionMetadata: apievents.ConnectionMetadata{
RemoteAddr: s.params.ByName("podName"),
},
}

if err := s.emitter.EmitAuditEvent(s.forwarder.ctx, sessionJoinEvent); err != nil {
s.forwarder.log.WithError(err).Warn("Failed to emit event.")
}
}

// leave removes a party from the session and returns if the party was still active
// in the session. If the party wasn't found, it returns false, nil.
func (s *session) leave(id uuid.UUID) (bool, error) {
Expand Down Expand Up @@ -1005,8 +1032,8 @@ func (s *session) unlockedLeave(id uuid.UUID) (bool, error) {

sessionLeaveEvent := &apievents.SessionLeave{
Metadata: apievents.Metadata{
Type: events.SessionJoinEvent,
Code: events.SessionJoinCode,
Type: events.SessionLeaveEvent,
Code: events.SessionLeaveCode,
ClusterName: s.ctx.teleportCluster.name,
},
SessionMetadata: s.getSessionMetadata(),
Expand Down
129 changes: 129 additions & 0 deletions lib/kube/proxy/sess_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,152 @@ limitations under the License.
package proxy

import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"net/url"
"strconv"
"sync"
"testing"
"time"

"github.com/google/uuid"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/remotecommand"

"github.com/gravitational/teleport/api/types"
apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/lib/auth"
"github.com/gravitational/teleport/lib/authz"
"github.com/gravitational/teleport/lib/events"
testingkubemock "github.com/gravitational/teleport/lib/kube/proxy/testing/kube_server"
)

func TestSessionEndError(t *testing.T) {
t.Parallel()
var (
eventsResult []apievents.AuditEvent
eventsResultMutex sync.Mutex
)
const (
errorMessage = "request denied"
errorCode = http.StatusForbidden
)
kubeMock, err := testingkubemock.NewKubeAPIMock(
testingkubemock.WithExecError(
metav1.Status{
Status: metav1.StatusFailure,
Message: errorMessage,
Reason: metav1.StatusReasonForbidden,
Code: errorCode,
},
),
)
require.NoError(t, err)
t.Cleanup(func() { kubeMock.Close() })

// creates a Kubernetes service with a configured cluster pointing to mock api server
testCtx := SetupTestContext(
context.Background(),
t,
TestConfig{
Clusters: []KubeClusterConfig{{Name: kubeCluster, APIEndpoint: kubeMock.URL}},
// collect all audit events
OnEvent: func(event apievents.AuditEvent) {
eventsResultMutex.Lock()
defer eventsResultMutex.Unlock()
eventsResult = append(eventsResult, event)
},
},
)

t.Cleanup(func() { require.NoError(t, testCtx.Close()) })

// create a user with access to kubernetes (kubernetes_user and kubernetes_groups specified)
user, _ := testCtx.CreateUserAndRole(
testCtx.Context,
t,
username,
RoleSpec{
Name: roleName,
KubeUsers: roleKubeUsers,
KubeGroups: roleKubeGroups,
})

// generate a kube client with user certs for auth
_, userRestConfig := testCtx.GenTestKubeClientTLSCert(
t,
user.GetName(),
kubeCluster,
)
require.NoError(t, err)

var (
stdinWrite = &bytes.Buffer{}
stdout = &bytes.Buffer{}
stderr = &bytes.Buffer{}
)

_, err = stdinWrite.Write(stdinContent)
require.NoError(t, err)

streamOpts := remotecommand.StreamOptions{
Stdin: io.NopCloser(stdinWrite),
Stdout: stdout,
Stderr: stderr,
Tty: false,
}

req, err := generateExecRequest(
generateExecRequestConfig{
addr: testCtx.KubeProxyAddress(),
podName: podName,
podNamespace: podNamespace,
containerName: podContainerName,
cmd: containerCommmandExecute, // placeholder for commands to execute in the dummy pod
options: streamOpts,
},
)
require.NoError(t, err)

exec, err := remotecommand.NewSPDYExecutor(userRestConfig, http.MethodPost, req.URL())
require.NoError(t, err)
err = exec.StreamWithContext(testCtx.Context, streamOpts)
require.Error(t, err)

// check that the session is ended with an error in audit log.
require.EventuallyWithT(t, func(t *assert.CollectT) {
eventsResultMutex.Lock()
defer eventsResultMutex.Unlock()
hasSessionEndEvent := false
hasSessionExecEvent := false
for _, event := range eventsResult {
if event.GetType() == events.SessionEndEvent {
hasSessionEndEvent = true
}
if event.GetType() != events.ExecEvent {
continue
}

execEvent, ok := event.(*apievents.Exec)
assert.True(t, ok)
assert.Equal(t, events.ExecFailureCode, execEvent.GetCode())
assert.Equal(t, strconv.Itoa(errorCode), execEvent.ExitCode)
assert.Equal(t, errorMessage, execEvent.Error)
hasSessionExecEvent = true
}
assert.Truef(t, hasSessionEndEvent, "session end event not found in audit log")
assert.Truef(t, hasSessionExecEvent, "session exec event not found in audit log")
}, 10*time.Second, 1*time.Second)
}

func Test_session_trackSession(t *testing.T) {
t.Parallel()
moderatedPolicy := &types.SessionTrackerPolicySet{
Expand Down
13 changes: 12 additions & 1 deletion lib/kube/proxy/testing/kube_server/kube_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,13 @@ func WithGetPodError(status metav1.Status) Option {
}
}

// WithExecError sets the error to be returned by the Exec call
func WithExecError(status metav1.Status) Option {
return func(s *KubeMockServer) {
s.execPodError = &status
}
}

type deletedResource struct {
requestID string
kind string
Expand All @@ -113,6 +120,7 @@ type KubeMockServer struct {
CA []byte
deletedResources map[deletedResource][]string
getPodError *metav1.Status
execPodError *metav1.Status
mu sync.Mutex
}

Expand Down Expand Up @@ -221,7 +229,10 @@ func (s *KubeMockServer) writeResponseError(rw http.ResponseWriter, respErr erro

func (s *KubeMockServer) exec(w http.ResponseWriter, req *http.Request, p httprouter.Params) (resp any, err error) {
q := req.URL.Query()

if s.execPodError != nil {
s.writeResponseError(w, nil, s.execPodError)
return nil, nil
}
request := remoteCommandRequest{
namespace: p.ByName("namespace"),
name: p.ByName("name"),
Expand Down

0 comments on commit 9b38f82

Please sign in to comment.