Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable and fix AuditOn. #17687

Merged
merged 4 commits into from
Nov 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion integration/helpers/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ func NewFixture(t *testing.T) *Fixture {
require.NoError(t, err)

// Find AllocatePortsNum free listening ports to use.
fixture.Me, _ = user.Current()
fixture.Me, err = user.Current()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated to the fix itself, but it bit me at some point, so I decided to fix it.

require.NoError(t, err)

// close & re-open stdin because 'go test' runs with os.stdin connected to /dev/null
stdin, err := os.Open("/dev/tty")
Expand Down
25 changes: 13 additions & 12 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,14 +282,6 @@ func testAuthLocalNodeControlStream(t *testing.T, suite *integrationTestSuite) {
// testAuditOn creates a live session, records a bunch of data through it
// and then reads it back and compares against simulated reality.
func testAuditOn(t *testing.T, suite *integrationTestSuite) {
// TODO(jakule): Re-enable once the fix is ready.
t.Skip("This test is flaky, skip for now.")

ctx := context.Background()

tr := utils.NewTracer(utils.ThisFunction()).Start()
defer tr.Stop()

tests := []struct {
comment string
inRecordLocation string
Expand Down Expand Up @@ -327,6 +319,11 @@ func testAuditOn(t *testing.T, suite *integrationTestSuite) {

for _, tt := range tests {
t.Run(tt.comment, func(t *testing.T) {
tr := utils.NewTracer(utils.ThisFunction()).Start()
t.Cleanup(func() {
tr.Stop()
})

makeConfig := func() (*testing.T, []string, []*helpers.InstanceSecrets, *service.Config) {
auditConfig, err := types.NewClusterAuditConfig(types.ClusterAuditConfigSpecV2{
AuditSessionsURI: tt.auditSessionsURI,
Expand All @@ -349,7 +346,10 @@ func testAuditOn(t *testing.T, suite *integrationTestSuite) {
return t, nil, nil, tconf
}
teleport := suite.NewTeleportWithConfig(makeConfig())
defer teleport.StopAll()
t.Cleanup(func() {
err := teleport.StopAll()
require.NoError(t, err)
})

// Start a node.
nodeConf := suite.defaultServiceConfig()
Expand All @@ -364,6 +364,8 @@ func testAuditOn(t *testing.T, suite *integrationTestSuite) {
site := teleport.GetSiteAPI(helpers.Site)
require.NotNil(t, site)

ctx := context.Background()

// wait 10 seconds for both nodes to show up, otherwise
// we'll have trouble connecting to the node below.
waitForNodes := func(site auth.ClientI, count int) error {
Expand Down Expand Up @@ -437,9 +439,8 @@ func testAuditOn(t *testing.T, suite *integrationTestSuite) {
// make sure it's us who joined! :)
require.Equal(t, suite.Me.Username, tracker.GetParticipants()[0].User)

// lets type "echo hi" followed by "enter" and then "exit" + "enter":

myTerm.Type("\aecho hi\n\r\aexit\n\r\a")
// let's type "echo hi" followed by "enter" and then "exit" + "enter":
myTerm.Type("echo hi\n\rexit\n\r")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

\a suspends the terminal for a second

time.Sleep(time.Second)

I don't see the point in waiting.


// wait for session to end:
select {
Expand Down
7 changes: 0 additions & 7 deletions lib/srv/reexec.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,13 +304,6 @@ func RunCommand() (errw io.Writer, code int, err error) {
if err != nil {
return errorWriter, teleport.RemoteCommandFailure, trace.Wrap(err)
}
defer func() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to close TTY and PTY before we close the process. This cleanup logic was introduced a few months ago #13491
After this function returns, the process ends anyway, so all file descriptors will be closed anyways.

for _, f := range cmd.ExtraFiles {
if err := f.Close(); err != nil {
log.WithError(err).Warn("Error closing extra file.")
}
}
}()

// Wait until the continue signal is received from Teleport signaling that
// the child process has been placed in a cgroup.
Expand Down
16 changes: 9 additions & 7 deletions lib/srv/sess.go
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,12 @@ func (s *session) startInteractive(ctx context.Context, ch ssh.Channel, scx *Ser
s.log.WithError(err).Error("Received error waiting for the interactive session to finish")
}

if result != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this block above the select. The result should be returned before we finish the session. Otherwise, we may miss the exit code.

if err := s.registry.broadcastResult(s.id, *result); err != nil {
s.log.Warningf("Failed to broadcast session result: %v", err)
}
}

// wait for copying from the pty to be complete or a timeout before
// broadcasting the result (which will close the pty) if it has not been
// closed already.
Expand All @@ -1013,14 +1019,10 @@ func (s *session) startInteractive(ctx context.Context, ch ssh.Channel, scx *Ser
emitExecAuditEvent(scx, execRequest.GetCommand(), err)
}

if result != nil {
if err := s.registry.broadcastResult(s.id, *result); err != nil {
s.log.Warningf("Failed to broadcast session result: %v", err)
}
}

s.emitSessionEndEvent()
s.Close()
if err := s.Close(); err != nil {
s.log.Warnf("Failed to close session: %v", err)
}
}()

return nil
Expand Down
26 changes: 19 additions & 7 deletions lib/srv/term.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,8 @@ func (t *terminal) AddParty(delta int) {
}

// Run will run the terminal.
func (t *terminal) Run(ctx context.Context) error {
func (t *terminal) Run(_ context.Context) error {
var err error
defer t.closeTTY()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're closing the TTY in two different places anyway. Closing it here causes the return code to be "skipped".


// Create the command that will actually execute.
t.cmd, err = ConfigureCommand(t.ctx)
if err != nil {
Expand Down Expand Up @@ -224,7 +222,9 @@ func (t *terminal) Wait() (*ExecResult, error) {
// Continue will resume execution of the process after it completes its
// pre-processing routine (placed in a cgroup).
func (t *terminal) Continue() {
t.ctx.contw.Close()
if err := t.ctx.contw.Close(); err != nil {
t.log.Warnf("failed to close server context")
}
}

// Kill will force kill the terminal.
Expand Down Expand Up @@ -269,10 +269,14 @@ func (t *terminal) Close() error {
}

func (t *terminal) closeTTY() error {
t.log.Debugf("Closing TTY")
defer t.log.Debugf("Closed TTY")

t.mu.Lock()
defer t.mu.Unlock()

if t.tty == nil {
t.log.Debugf("TTY already closed")
return nil
}

Expand All @@ -287,14 +291,21 @@ func (t *terminal) closeTTY() error {
}

func (t *terminal) closePTY() {
defer t.log.Debugf("Closed PTY")

t.mu.Lock()
defer t.mu.Unlock()
defer t.log.Debugf("Closed PTY")

// wait until all copying is over (all participants have left)
t.wg.Wait()

t.pty.Close()
if t.pty == nil {
return
}

if err := t.pty.Close(); err != nil {
t.log.Warnf("Failed to close PTY: %v", err)
}
t.pty = nil
}

Expand Down Expand Up @@ -458,7 +469,7 @@ func (b *ptyBuffer) Write(p []byte) (n int, err error) {
}

func (t *remoteTerminal) Run(ctx context.Context) error {
// prepare the remote remote session by setting environment variables
// prepare the remote session by setting environment variables
t.prepareRemoteSession(ctx, t.session, t.ctx)

// combine stdout and stderr
Expand Down Expand Up @@ -560,6 +571,7 @@ func (t *remoteTerminal) PID() int {
}

func (t *remoteTerminal) Close() error {
t.wg.Wait()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This maybe controversial, but this t.wg is not being used anywhere, and from what I see we should wait on it here (the same as terminal).

// this closes the underlying stdin,stdout,stderr which is what ptyBuffer is
// hooked to directly
err := t.session.Close()
Expand Down