Skip to content

Commit

Permalink
[v8] auditlog: fix panic during concurrent streams of the same session (
Browse files Browse the repository at this point in the history
#15376)

The code attempts to wait on an existing download if one is already
in progress rather than starting a concurrent download of the same
session. If this code path runs, we incorrectly defer a call to a
nil function, triggering a panic.

This bug was introduced in #7360.
  • Loading branch information
zmb3 authored Aug 12, 2022
1 parent 4e1fbdf commit a1d7188
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 1 deletion.
4 changes: 3 additions & 1 deletion lib/events/auditlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -1064,8 +1064,10 @@ func (l *AuditLog) StreamSessionEvents(ctx context.Context, sessionID session.ID
e <- trace.BadParameter("audit log is closing, aborting the download")
return c, e
}
} else {
defer cancel()
}
defer cancel()

rawSession, err := os.OpenFile(tarballPath, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0640)
if err != nil {
e <- trace.Wrap(err)
Expand Down
51 changes: 51 additions & 0 deletions lib/events/auditlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
"testing"
"time"

"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/require"
"gopkg.in/check.v1"

"github.com/gravitational/teleport"
Expand Down Expand Up @@ -464,6 +467,54 @@ func (a *AuditTestSuite) TestLegacyHandler(c *check.C) {
c.Assert(err, check.IsNil)
}

func TestConcurrentStreaming(t *testing.T) {
uploader := NewMemoryUploader()
alog, err := NewAuditLog(AuditLogConfig{
DataDir: t.TempDir(),
RecordSessions: true,
Clock: clockwork.NewFakeClock(),
ServerID: "remote",
UploadHandler: uploader,
})
require.NoError(t, err)
t.Cleanup(func() { alog.Close() })

ctx := context.Background()
sid := session.ID("abc123")

// upload a bogus session so that we can try to stream its events
// (this is not valid protobuf, so the stream is not expected to succeed)
_, err = uploader.Upload(ctx, sid, io.NopCloser(strings.NewReader(`asdfasdfasdfasdfasdef`)))
require.NoError(t, err)

// run multiple concurrent streams, which forces the second one to wait
// on the download that the first one started
streams := 2
errors := make(chan error, streams)
for i := 0; i < streams; i++ {
go func() {
eventsC, errC := alog.StreamSessionEvents(ctx, sid, 0)
for {
select {
case err := <-errC:
errors <- err
case _, ok := <-eventsC:
if !ok {
errors <- nil
return
}
}
}
}()
}

// This test just verifies that the streamer does not panic when multiple
// concurrent streams are waiting on the same download to complete.
for i := 0; i < streams; i++ {
<-errors
}
}

// TestExternalLog tests forwarding server and upload
// server case
func (a *AuditTestSuite) TestExternalLog(c *check.C) {
Expand Down
11 changes: 11 additions & 0 deletions lib/events/complete.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,17 @@ func (u *UploadCompleter) CheckUploads(ctx context.Context) error {

uploadData := u.cfg.Uploader.GetUploadMetadata(upload.SessionID)

// It's possible that we don't have a session ID here. For example,
// an S3 multipart upload may have been completed by another auth
// server, in which case the API returns an empty key, leaving us
// no way to derive the session ID from the upload.
//
// If this is the case, there's no work left to do, and we can
// proceed to the next upload.
if uploadData.SessionID == "" {
continue
}

// Schedule a background operation to check for (and emit) a session end event.
// This is necessary because we'll need to download the session in order to
// enumerate its events, and the S3 API takes a little while after the upload
Expand Down

0 comments on commit a1d7188

Please sign in to comment.