diff --git a/lib/kube/proxy/forwarder.go b/lib/kube/proxy/forwarder.go index 08a57a756226d..b90909dc72b4d 100644 --- a/lib/kube/proxy/forwarder.go +++ b/lib/kube/proxy/forwarder.go @@ -1205,7 +1205,7 @@ func (f *Forwarder) join(ctx *authContext, w http.ResponseWriter, req *http.Requ client := &websocketClientStreams{stream} party := newParty(*ctx, stream.Mode, client) - err = session.join(party) + err = session.join(party, true /* emitSessionJoinEvent */) if err != nil { return trace.Wrap(err) } @@ -1581,6 +1581,9 @@ func exitCode(err error) (errMsg, code string) { return } errMsg = kubeStatusErr.ErrStatus.Message + if errMsg == "" { + errMsg = string(kubeStatusErr.ErrStatus.Reason) + } code = strconv.Itoa(int(kubeStatusErr.ErrStatus.Code)) } else if errors.As(err, &kubeExecErr) { if kubeExecErr.Err != nil { @@ -1691,7 +1694,9 @@ func (f *Forwarder) exec(authCtx *authContext, w http.ResponseWriter, req *http. } f.setSession(session.id, session) - err = session.join(party) + // When Teleport attaches the original session creator terminal streams to the + // session, we don't wan't to emmit session.join event since it won't be required. + err = session.join(party, false /* emitSessionJoinEvent */) if err != nil { // This error must be forwarded to SPDY error stream, otherwise the client // will hang waiting for the response. diff --git a/lib/kube/proxy/sess.go b/lib/kube/proxy/sess.go index 79c1537fa8b68..37232ed3e63e6 100644 --- a/lib/kube/proxy/sess.go +++ b/lib/kube/proxy/sess.go @@ -610,12 +610,24 @@ func (s *session) launch() error { s.io.On() if err = executor.StreamWithContext(s.streamContext, options); err != nil { + s.reportErrorToSessionRecorder(err) s.log.WithError(err).Warning("Executor failed while streaming.") return trace.Wrap(err) } return nil } +// reportErrorToSessionRecorder reports the error to the session recorder +// if it is set. +func (s *session) reportErrorToSessionRecorder(err error) { + if err == nil { + return + } + if s.recorder != nil { + fmt.Fprintf(s.recorder, "\n---\nSession exited with error: %v\n", err) + } +} + func (s *session) lockedSetupLaunch(request *remoteCommandRequest, q url.Values, eventPodMeta apievents.KubernetesPodMetadata) (func(error), error) { s.mu.Lock() defer s.mu.Unlock() @@ -773,7 +785,7 @@ func (s *session) lockedSetupLaunch(request *remoteCommandRequest, q url.Values, if errExec != nil { execEvent.Code = events.ExecFailureCode - execEvent.Error, execEvent.ExitCode = exitCode(err) + execEvent.Error, execEvent.ExitCode = exitCode(errExec) } if err := s.emitter.EmitAuditEvent(s.forwarder.ctx, execEvent); err != nil { @@ -833,7 +845,7 @@ func (s *session) lockedSetupLaunch(request *remoteCommandRequest, q url.Values, } // join attempts to connect a party to the session. -func (s *session) join(p *party) error { +func (s *session) join(p *party, emitJoinEvent bool) error { if p.Ctx.User.GetName() != s.ctx.User.GetName() { roles := p.Ctx.Checker.Roles() @@ -863,27 +875,11 @@ func (s *session) join(p *party) error { return trace.Wrap(err) } - sessionJoinEvent := &apievents.SessionJoin{ - Metadata: apievents.Metadata{ - Type: events.SessionJoinEvent, - Code: events.SessionJoinCode, - ClusterName: s.ctx.teleportCluster.name, - }, - KubernetesClusterMetadata: apievents.KubernetesClusterMetadata{ - KubernetesCluster: s.ctx.kubeClusterName, - KubernetesUsers: []string{}, - KubernetesGroups: []string{}, - KubernetesLabels: s.ctx.kubeClusterLabels, - }, - SessionMetadata: s.getSessionMetadata(), - UserMetadata: p.Ctx.eventUserMetaWithLogin("root"), - ConnectionMetadata: apievents.ConnectionMetadata{ - RemoteAddr: s.params.ByName("podName"), - }, - } - - if err := s.emitter.EmitAuditEvent(s.forwarder.ctx, sessionJoinEvent); err != nil { - s.forwarder.log.WithError(err).Warn("Failed to emit event.") + // we only want to emit the session.join when someone tries to join a session via + // tsh kube join and not when the original session owner terminal streams are + // connected to the Kubernetes session. + if emitJoinEvent { + s.emitSessionJoinEvent(p) } recentWrites := s.io.GetRecentHistory() @@ -972,6 +968,37 @@ func (s *session) BroadcastMessage(format string, args ...any) { } } +// emitSessionJoinEvent emits a session.join audit event when a user joins +// the session. +// This function requires that the session must be active, otherwise audit logger +// will discard the event. +func (s *session) emitSessionJoinEvent(p *party) { + sessionJoinEvent := &apievents.SessionJoin{ + Metadata: apievents.Metadata{ + Type: events.SessionJoinEvent, + Code: events.SessionJoinCode, + ClusterName: s.ctx.teleportCluster.name, + }, + KubernetesClusterMetadata: apievents.KubernetesClusterMetadata{ + KubernetesCluster: s.ctx.kubeClusterName, + // joining moderators, obervers and peers don't have any + // kubernetes metadata configured. + KubernetesUsers: []string{}, + KubernetesGroups: []string{}, + KubernetesLabels: s.ctx.kubeClusterLabels, + }, + SessionMetadata: s.getSessionMetadata(), + UserMetadata: p.Ctx.eventUserMetaWithLogin("root"), + ConnectionMetadata: apievents.ConnectionMetadata{ + RemoteAddr: s.params.ByName("podName"), + }, + } + + if err := s.emitter.EmitAuditEvent(s.forwarder.ctx, sessionJoinEvent); err != nil { + s.forwarder.log.WithError(err).Warn("Failed to emit event.") + } +} + // leave removes a party from the session and returns if the party was still active // in the session. If the party wasn't found, it returns false, nil. func (s *session) leave(id uuid.UUID) (bool, error) { @@ -1005,8 +1032,8 @@ func (s *session) unlockedLeave(id uuid.UUID) (bool, error) { sessionLeaveEvent := &apievents.SessionLeave{ Metadata: apievents.Metadata{ - Type: events.SessionJoinEvent, - Code: events.SessionJoinCode, + Type: events.SessionLeaveEvent, + Code: events.SessionLeaveCode, ClusterName: s.ctx.teleportCluster.name, }, SessionMetadata: s.getSessionMetadata(), diff --git a/lib/kube/proxy/sess_test.go b/lib/kube/proxy/sess_test.go index e119c91988fe2..34a331de9217d 100644 --- a/lib/kube/proxy/sess_test.go +++ b/lib/kube/proxy/sess_test.go @@ -17,23 +17,152 @@ limitations under the License. package proxy import ( + "bytes" "context" "fmt" + "io" "net/http" "net/url" + "strconv" + "sync" "testing" + "time" "github.com/google/uuid" "github.com/gravitational/trace" "github.com/jonboulle/clockwork" "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/remotecommand" "github.com/gravitational/teleport/api/types" + apievents "github.com/gravitational/teleport/api/types/events" "github.com/gravitational/teleport/lib/auth" "github.com/gravitational/teleport/lib/authz" + "github.com/gravitational/teleport/lib/events" + testingkubemock "github.com/gravitational/teleport/lib/kube/proxy/testing/kube_server" ) +func TestSessionEndError(t *testing.T) { + t.Parallel() + var ( + eventsResult []apievents.AuditEvent + eventsResultMutex sync.Mutex + ) + const ( + errorMessage = "request denied" + errorCode = http.StatusForbidden + ) + kubeMock, err := testingkubemock.NewKubeAPIMock( + testingkubemock.WithExecError( + metav1.Status{ + Status: metav1.StatusFailure, + Message: errorMessage, + Reason: metav1.StatusReasonForbidden, + Code: errorCode, + }, + ), + ) + require.NoError(t, err) + t.Cleanup(func() { kubeMock.Close() }) + + // creates a Kubernetes service with a configured cluster pointing to mock api server + testCtx := SetupTestContext( + context.Background(), + t, + TestConfig{ + Clusters: []KubeClusterConfig{{Name: kubeCluster, APIEndpoint: kubeMock.URL}}, + // collect all audit events + OnEvent: func(event apievents.AuditEvent) { + eventsResultMutex.Lock() + defer eventsResultMutex.Unlock() + eventsResult = append(eventsResult, event) + }, + }, + ) + + t.Cleanup(func() { require.NoError(t, testCtx.Close()) }) + + // create a user with access to kubernetes (kubernetes_user and kubernetes_groups specified) + user, _ := testCtx.CreateUserAndRole( + testCtx.Context, + t, + username, + RoleSpec{ + Name: roleName, + KubeUsers: roleKubeUsers, + KubeGroups: roleKubeGroups, + }) + + // generate a kube client with user certs for auth + _, userRestConfig := testCtx.GenTestKubeClientTLSCert( + t, + user.GetName(), + kubeCluster, + ) + require.NoError(t, err) + + var ( + stdinWrite = &bytes.Buffer{} + stdout = &bytes.Buffer{} + stderr = &bytes.Buffer{} + ) + + _, err = stdinWrite.Write(stdinContent) + require.NoError(t, err) + + streamOpts := remotecommand.StreamOptions{ + Stdin: io.NopCloser(stdinWrite), + Stdout: stdout, + Stderr: stderr, + Tty: false, + } + + req, err := generateExecRequest( + generateExecRequestConfig{ + addr: testCtx.KubeProxyAddress(), + podName: podName, + podNamespace: podNamespace, + containerName: podContainerName, + cmd: containerCommmandExecute, // placeholder for commands to execute in the dummy pod + options: streamOpts, + }, + ) + require.NoError(t, err) + + exec, err := remotecommand.NewSPDYExecutor(userRestConfig, http.MethodPost, req.URL()) + require.NoError(t, err) + err = exec.StreamWithContext(testCtx.Context, streamOpts) + require.Error(t, err) + + // check that the session is ended with an error in audit log. + require.EventuallyWithT(t, func(t *assert.CollectT) { + eventsResultMutex.Lock() + defer eventsResultMutex.Unlock() + hasSessionEndEvent := false + hasSessionExecEvent := false + for _, event := range eventsResult { + if event.GetType() == events.SessionEndEvent { + hasSessionEndEvent = true + } + if event.GetType() != events.ExecEvent { + continue + } + + execEvent, ok := event.(*apievents.Exec) + assert.True(t, ok) + assert.Equal(t, events.ExecFailureCode, execEvent.GetCode()) + assert.Equal(t, strconv.Itoa(errorCode), execEvent.ExitCode) + assert.Equal(t, errorMessage, execEvent.Error) + hasSessionExecEvent = true + } + assert.Truef(t, hasSessionEndEvent, "session end event not found in audit log") + assert.Truef(t, hasSessionExecEvent, "session exec event not found in audit log") + }, 10*time.Second, 1*time.Second) +} + func Test_session_trackSession(t *testing.T) { t.Parallel() moderatedPolicy := &types.SessionTrackerPolicySet{ diff --git a/lib/kube/proxy/testing/kube_server/kube_mock.go b/lib/kube/proxy/testing/kube_server/kube_mock.go index ac58ada29f790..bd2037227fabe 100644 --- a/lib/kube/proxy/testing/kube_server/kube_mock.go +++ b/lib/kube/proxy/testing/kube_server/kube_mock.go @@ -99,6 +99,13 @@ func WithGetPodError(status metav1.Status) Option { } } +// WithExecError sets the error to be returned by the Exec call +func WithExecError(status metav1.Status) Option { + return func(s *KubeMockServer) { + s.execPodError = &status + } +} + type deletedResource struct { requestID string kind string @@ -113,6 +120,7 @@ type KubeMockServer struct { CA []byte deletedResources map[deletedResource][]string getPodError *metav1.Status + execPodError *metav1.Status mu sync.Mutex } @@ -221,7 +229,10 @@ func (s *KubeMockServer) writeResponseError(rw http.ResponseWriter, respErr erro func (s *KubeMockServer) exec(w http.ResponseWriter, req *http.Request, p httprouter.Params) (resp any, err error) { q := req.URL.Query() - + if s.execPodError != nil { + s.writeResponseError(w, nil, s.execPodError) + return nil, nil + } request := remoteCommandRequest{ namespace: p.ByName("namespace"), name: p.ByName("name"),