Skip to content

Commit

Permalink
Deflake TestAgentForward (#13166)
Browse files Browse the repository at this point in the history
* Attempt to deflake TestAgentForward

This was another case of the t.TempDir() being cleaned up while the
audit logger is still writing to the directory, which happens when
tests don't properly clean up after themselves. Ensure that any
services we spin up closed via cleanup actions.

* Prevent the use of disk-based logging in TestAgentForward

The disk-based logger runs a background process to complete uploads,
which occaisionally fails to finish before the test cleanup tries
to remove the temporary directory.

There are two ways to prevent the use of a disk based logger:

1. Set IsTestStub on the SSH *ServerContext
2. Use a sync session recording mode

Option 2 was selected, because the ServerContext is created by the SSH
server instead of the test, so plumbing that value through would be
a larger change, and I generally dislike test specific modes that can
be mistakenly enabled in non-test situations.

Additionally, update the lib/srv/regular test fixture to allow for
configuring the audit log to use. This allows us to set up a dicarding
logger, since these tests are about agent forwarding behavior and not
audit logging.
  • Loading branch information
zmb3 committed Jun 13, 2022
1 parent 1471d35 commit 0e8405b
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 27 deletions.
25 changes: 15 additions & 10 deletions lib/auth/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ type TestAuthServerConfig struct {
ClusterNetworkingConfig types.ClusterNetworkingConfig
// Streamer allows a test to set its own audit events streamer.
Streamer events.Streamer
// AuditLog allows a test to configure its own audit log.
AuditLog events.IAuditLog
}

// CheckAndSetDefaults checks and sets defaults
Expand Down Expand Up @@ -208,16 +210,19 @@ func NewTestAuthServer(cfg TestAuthServerConfig) (*TestAuthServer, error) {
// Wrap backend in sanitizer like in production.
srv.Backend = backend.NewSanitizer(b)

localLog, err := events.NewAuditLog(events.AuditLogConfig{
DataDir: cfg.Dir,
RecordSessions: true,
ServerID: cfg.ClusterName,
UploadHandler: events.NewMemoryUploader(),
})
if err != nil {
return nil, trace.Wrap(err)
if cfg.AuditLog != nil {
srv.AuditLog = cfg.AuditLog
} else {
localLog, err := events.NewAuditLog(events.AuditLogConfig{
DataDir: cfg.Dir,
ServerID: cfg.ClusterName,
UploadHandler: events.NewMemoryUploader(),
})
if err != nil {
return nil, trace.Wrap(err)
}
srv.AuditLog = localLog
}
srv.AuditLog = localLog

srv.SessionServer, err = session.New(srv.Backend)
if err != nil {
Expand All @@ -235,7 +240,7 @@ func NewTestAuthServer(cfg TestAuthServerConfig) (*TestAuthServer, error) {
AuditLog: srv.AuditLog,
Streamer: cfg.Streamer,
SkipPeriodicOperations: true,
Emitter: localLog,
Emitter: srv.AuditLog,
}, WithClock(cfg.Clock))
if err != nil {
return nil, trace.Wrap(err)
Expand Down
2 changes: 1 addition & 1 deletion lib/events/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (cfg *ProtoStreamConfig) CheckAndSetDefaults() error {
return nil
}

// NewProtoStream uploads session recordings to the protobuf format.
// NewProtoStream uploads session recordings in the protobuf format.
//
// The individual session stream is represented by continuous globally
// ordered sequence of events serialized to binary protobuf format.
Expand Down
44 changes: 28 additions & 16 deletions lib/srv/regular/sshserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/lib/auth"
"github.com/gravitational/teleport/lib/bpf"
"github.com/gravitational/teleport/lib/events"
"github.com/gravitational/teleport/lib/limiter"
"github.com/gravitational/teleport/lib/pam"
restricted "github.com/gravitational/teleport/lib/restrictedsession"
Expand Down Expand Up @@ -156,6 +157,7 @@ func newCustomFixture(t *testing.T, mutateCfg func(*auth.TestServerConfig), sshO
},
})
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, nodeClient.Close()) })

nodeDir := t.TempDir()
serverOptions := []ServerOption{
Expand Down Expand Up @@ -195,7 +197,10 @@ func newCustomFixture(t *testing.T, mutateCfg func(*auth.TestServerConfig), sshO
require.NoError(t, err)
require.NoError(t, auth.CreateUploaderDir(nodeDir))
require.NoError(t, sshSrv.Start())
t.Cleanup(func() { require.NoError(t, sshSrv.Close()) })
t.Cleanup(func() {
require.NoError(t, sshSrv.Close())
sshSrv.Wait()
})

require.NoError(t, sshSrv.heartbeat.ForceSend(time.Second))

Expand Down Expand Up @@ -244,7 +249,6 @@ func newCustomFixture(t *testing.T, mutateCfg func(*auth.TestServerConfig), sshO

t.Cleanup(func() { f.ssh.assertCltClose(t, client.Close()) })
require.NoError(t, agent.ForwardToAgent(client, keyring))

return f
}

Expand Down Expand Up @@ -591,7 +595,9 @@ func TestOpenExecSessionSetsSession(t *testing.T) {
// TestAgentForward tests agent forwarding via unix sockets
func TestAgentForward(t *testing.T) {
t.Parallel()
f := newFixture(t)
f := newCustomFixture(t, func(cfg *auth.TestServerConfig) {
cfg.Auth.AuditLog = events.NewDiscardAuditLog()
})

ctx := context.Background()
roleName := services.RoleNameForUser(f.user)
Expand All @@ -603,29 +609,36 @@ func TestAgentForward(t *testing.T) {
err = f.testSrv.Auth().UpsertRole(ctx, role)
require.NoError(t, err)

// use a sync recording mode because the disk-based uploader
// that runs in the background introduces races with test cleanup
recConfig := types.DefaultSessionRecordingConfig()
recConfig.SetMode(types.RecordAtNodeSync)
err = f.testSrv.Auth().SetSessionRecordingConfig(ctx, recConfig)
require.NoError(t, err)

se, err := f.ssh.clt.NewSession()
require.NoError(t, err)
defer se.Close()
t.Cleanup(func() { se.Close() })

err = agent.RequestAgentForwarding(se)
require.NoError(t, err)

// prepare to send virtual "keyboard input" into the shell:
keyboard, err := se.StdinPipe()
require.NoError(t, err)
t.Cleanup(func() { keyboard.Close() })

// start interactive SSH session (new shell):
err = se.Shell()
require.NoError(t, err)

// create a temp file to collect the shell output into:
tmpFile, err := ioutil.TempFile(os.TempDir(), "teleport-agent-forward-test")
tmpFile, err := os.CreateTemp(t.TempDir(), "teleport-agent-forward-test")
require.NoError(t, err)
tmpFile.Close()
defer os.Remove(tmpFile.Name())

// type 'printenv SSH_AUTH_SOCK > /path/to/tmp/file' into the session (dumping the value of SSH_AUTH_STOCK into the temp file)
_, err = keyboard.Write([]byte(fmt.Sprintf("printenv %v >> %s\n\r", teleport.SSHAuthSock, tmpFile.Name())))
_, err = fmt.Fprintf(keyboard, "printenv %v >> %s\n\r", teleport.SSHAuthSock, tmpFile.Name())
require.NoError(t, err)

// wait for the output
Expand All @@ -642,8 +655,8 @@ func TestAgentForward(t *testing.T) {
// try dialing the ssh agent socket:
file, err := net.Dial("unix", socketPath)
require.NoError(t, err)
clientAgent := agent.NewClient(file)

clientAgent := agent.NewClient(file)
signers, err := clientAgent.Signers()
require.NoError(t, err)

Expand All @@ -663,6 +676,7 @@ func TestAgentForward(t *testing.T) {
// sessions on the connection).
err = se.Close()
require.NoError(t, err)

// Pause to allow closure to propagate.
time.Sleep(150 * time.Millisecond)
_, err = net.Dial("unix", socketPath)
Expand All @@ -676,14 +690,12 @@ func TestAgentForward(t *testing.T) {

// clt must be nullified to prevent double-close during test cleanup
f.ssh.clt = nil
for i := 0; i < 4; i++ {
_, err = net.Dial("unix", socketPath)
if err != nil {
return
}
time.Sleep(50 * time.Millisecond)
}
require.FailNow(t, "expected socket to be closed, still could dial after 150 ms")
require.Eventually(t, func() bool {
_, err := net.Dial("unix", socketPath)
return err != nil
},
150*time.Millisecond, 50*time.Millisecond,
"expected socket to be closed, still could dial")
}

// TestX11Forward tests x11 forwarding via unix sockets
Expand Down

0 comments on commit 0e8405b

Please sign in to comment.