From e709a878396d6300381588dc61e45a409e538ef6 Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Tue, 22 May 2018 18:27:40 -0400 Subject: [PATCH] Stop worker goroutine on auditd metricset shutdown A worker goroutine for reading events from socket was being leaked when the auditd metricset was stopped. The goroutine will now stop when the AuditClient is closed because it will exited on EBADF (bad file descriptor). I cleaned up a few tests in the process and added a filter for AUDIT_REPLACE messages. --- CHANGELOG.asciidoc | 1 + auditbeat/module/auditd/_meta/data.json | 6 ++ auditbeat/module/auditd/_meta/execve.json | 11 +++- auditbeat/module/auditd/audit_linux.go | 13 ++++- auditbeat/module/auditd/audit_linux_test.go | 62 +++++++++++---------- 5 files changed, 60 insertions(+), 33 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index ec90e42181f..34ef2015f03 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -69,6 +69,7 @@ https://github.com/elastic/beats/compare/v6.2.3...master[Check the HEAD diff] - Fixed a deadlock in the file_integrity module under Windows. {issue}6864[6864] - Fixed parsing of AppArmor audit messages. {pull}6978[6978] - Allow `auditbeat setup` to run without requiring elevated privileges for the audit client. {issue}7111[7111] +- Fix goroutine leak that occured when the auditd module was stopped. {pull}7163[7163] *Filebeat* diff --git a/auditbeat/module/auditd/_meta/data.json b/auditbeat/module/auditd/_meta/data.json index 63b2a3b3f0a..d84f6037f3b 100644 --- a/auditbeat/module/auditd/_meta/data.json +++ b/auditbeat/module/auditd/_meta/data.json @@ -32,10 +32,16 @@ "module": "auditd", "type": "user_login" }, + "network": { + "direction": "incoming" + }, "process": { "exe": "/usr/sbin/sshd", "pid": "12635" }, + "source": { + "ip": "179.38.151.221" + }, "user": { "auid": "unset", "name_map": { diff --git a/auditbeat/module/auditd/_meta/execve.json b/auditbeat/module/auditd/_meta/execve.json index 1e2bcda571d..7b5e0b62a47 100644 --- a/auditbeat/module/auditd/_meta/execve.json +++ b/auditbeat/module/auditd/_meta/execve.json @@ -40,8 +40,8 @@ "session": "11", "summary": { "actor": { - "primary": "1001", - "secondary": "1001" + "primary": "ubuntu", + "secondary": "ubuntu" }, "how": "/bin/uname", "object": { @@ -88,6 +88,13 @@ "fsgid": "1002", "fsuid": "1001", "gid": "1002", + "name_map": { + "auid": "ubuntu", + "euid": "ubuntu", + "fsuid": "ubuntu", + "suid": "ubuntu", + "uid": "ubuntu" + }, "sgid": "1002", "suid": "1001", "uid": "1001" diff --git a/auditbeat/module/auditd/audit_linux.go b/auditbeat/module/auditd/audit_linux.go index c7216a13fb2..c4380f0b2b9 100644 --- a/auditbeat/module/auditd/audit_linux.go +++ b/auditbeat/module/auditd/audit_linux.go @@ -243,12 +243,17 @@ func (ms *MetricSet) receiveEvents(done <-chan struct{}) (<-chan []*auparse.Audi go maintain(done, reassembler) go func() { + defer ms.log.Debug("receiveEvents goroutine exited") defer close(out) defer reassembler.Close() for { raw, err := ms.client.Receive(false) if err != nil { + if errors.Cause(err) == syscall.EBADF { + // Client has been closed. + break + } continue } @@ -289,8 +294,14 @@ func maintain(done <-chan struct{}, reassembler *libaudit.Reassembler) { } func filterRecordType(typ auparse.AuditMessageType) bool { + switch { + // REPLACE messages are tests to check if Auditbeat is still healthy by + // seeing if unicast messages can be sent without error from the kernel. + // Ignore them. + case typ == auparse.AUDIT_REPLACE: + return true // Messages from 1300-2999 are valid audit message types. - if typ < auparse.AUDIT_USER_AUTH || typ > auparse.AUDIT_LAST_USER_MSG2 { + case typ < auparse.AUDIT_USER_AUTH || typ > auparse.AUDIT_LAST_USER_MSG2: return true } diff --git a/auditbeat/module/auditd/audit_linux_test.go b/auditbeat/module/auditd/audit_linux_test.go index c4e657f54ed..08f8ba460b7 100644 --- a/auditbeat/module/auditd/audit_linux_test.go +++ b/auditbeat/module/auditd/audit_linux_test.go @@ -16,6 +16,7 @@ import ( "github.com/elastic/beats/auditbeat/core" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/metricbeat/mb" mbtest "github.com/elastic/beats/metricbeat/mb/testing" "github.com/elastic/go-libaudit" "github.com/elastic/go-libaudit/auparse" @@ -67,14 +68,10 @@ func TestData(t *testing.T) { auditMetricSet.client = &libaudit.AuditClient{Netlink: mock} events := mbtest.RunPushMetricSetV2(10*time.Second, 1, ms) - for _, e := range events { - if e.Error != nil { - t.Fatalf("received error: %+v", e.Error) - } - } if len(events) == 0 { t.Fatal("received no events") } + assertNoErrors(t, events) beatEvent := mbtest.StandardizeEvent(ms, events[0], core.AddDatasetToEvent) mbtest.WriteEventToDataJSON(t, beatEvent, "") @@ -110,23 +107,8 @@ func TestUnicastClient(t *testing.T) { ms := mbtest.NewPushMetricSetV2(t, c) events := mbtest.RunPushMetricSetV2(5*time.Second, 0, ms) - for _, e := range events { - t.Log(e) - - if e.Error != nil { - t.Errorf("received error: %+v", e.Error) - } - } - - for _, e := range events { - v, err := e.MetricSetFields.GetValue("thing.primary") - if err == nil { - if exe, ok := v.(string); ok && exe == "/bin/cat" { - return - } - } - } - assert.Fail(t, "expected an execve event for /bin/cat") + assertNoErrors(t, events) + assertHasBinCatExecve(t, events) } func TestMulticastClient(t *testing.T) { @@ -155,14 +137,8 @@ func TestMulticastClient(t *testing.T) { ms := mbtest.NewPushMetricSetV2(t, c) events := mbtest.RunPushMetricSetV2(5*time.Second, 0, ms) - for _, e := range events { - if e.Error != nil { - t.Fatalf("received error: %+v", e.Error) - } - } - - // The number of events is non-deterministic so there is no validation. - t.Logf("received %d messages via multicast", len(events)) + assertNoErrors(t, events) + assertHasBinCatExecve(t, events) } func TestKernelVersion(t *testing.T) { @@ -224,3 +200,29 @@ func buildSampleEvent(t testing.TB, lines []string, filename string) { t.Fatal(err) } } + +func assertHasBinCatExecve(t *testing.T, events []mb.Event) { + t.Helper() + + for _, e := range events { + v, err := e.RootFields.GetValue("process.exe") + if err == nil { + if exe, ok := v.(string); ok && exe == "/bin/cat" { + return + } + } + } + assert.Fail(t, "expected an execve event for /bin/cat") +} + +func assertNoErrors(t *testing.T, events []mb.Event) { + t.Helper() + + for _, e := range events { + t.Log(e) + + if e.Error != nil { + t.Errorf("received error: %+v", e.Error) + } + } +}