Skip to content

Commit

Permalink
Stop worker goroutine on auditd metricset shutdown
Browse files Browse the repository at this point in the history
A worker goroutine for reading events from socket was being leaked when the auditd metricset was stopped. The goroutine will now stop when the AuditClient is closed because it will exited on EBADF (bad file descriptor).

I cleaned up a few tests in the process and added a filter for AUDIT_REPLACE messages.
  • Loading branch information
andrewkroh committed May 22, 2018
1 parent 85510a7 commit e709a87
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 33 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ https://github.com/elastic/beats/compare/v6.2.3...master[Check the HEAD diff]
- Fixed a deadlock in the file_integrity module under Windows. {issue}6864[6864]
- Fixed parsing of AppArmor audit messages. {pull}6978[6978]
- Allow `auditbeat setup` to run without requiring elevated privileges for the audit client. {issue}7111[7111]
- Fix goroutine leak that occured when the auditd module was stopped. {pull}7163[7163]

*Filebeat*

Expand Down
6 changes: 6 additions & 0 deletions auditbeat/module/auditd/_meta/data.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,16 @@
"module": "auditd",
"type": "user_login"
},
"network": {
"direction": "incoming"
},
"process": {
"exe": "/usr/sbin/sshd",
"pid": "12635"
},
"source": {
"ip": "179.38.151.221"
},
"user": {
"auid": "unset",
"name_map": {
Expand Down
11 changes: 9 additions & 2 deletions auditbeat/module/auditd/_meta/execve.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
"session": "11",
"summary": {
"actor": {
"primary": "1001",
"secondary": "1001"
"primary": "ubuntu",
"secondary": "ubuntu"
},
"how": "/bin/uname",
"object": {
Expand Down Expand Up @@ -88,6 +88,13 @@
"fsgid": "1002",
"fsuid": "1001",
"gid": "1002",
"name_map": {
"auid": "ubuntu",
"euid": "ubuntu",
"fsuid": "ubuntu",
"suid": "ubuntu",
"uid": "ubuntu"
},
"sgid": "1002",
"suid": "1001",
"uid": "1001"
Expand Down
13 changes: 12 additions & 1 deletion auditbeat/module/auditd/audit_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,12 +243,17 @@ func (ms *MetricSet) receiveEvents(done <-chan struct{}) (<-chan []*auparse.Audi
go maintain(done, reassembler)

go func() {
defer ms.log.Debug("receiveEvents goroutine exited")
defer close(out)
defer reassembler.Close()

for {
raw, err := ms.client.Receive(false)
if err != nil {
if errors.Cause(err) == syscall.EBADF {
// Client has been closed.
break
}
continue
}

Expand Down Expand Up @@ -289,8 +294,14 @@ func maintain(done <-chan struct{}, reassembler *libaudit.Reassembler) {
}

func filterRecordType(typ auparse.AuditMessageType) bool {
switch {
// REPLACE messages are tests to check if Auditbeat is still healthy by
// seeing if unicast messages can be sent without error from the kernel.
// Ignore them.
case typ == auparse.AUDIT_REPLACE:
return true
// Messages from 1300-2999 are valid audit message types.
if typ < auparse.AUDIT_USER_AUTH || typ > auparse.AUDIT_LAST_USER_MSG2 {
case typ < auparse.AUDIT_USER_AUTH || typ > auparse.AUDIT_LAST_USER_MSG2:
return true
}

Expand Down
62 changes: 32 additions & 30 deletions auditbeat/module/auditd/audit_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/elastic/beats/auditbeat/core"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/mb"
mbtest "github.com/elastic/beats/metricbeat/mb/testing"
"github.com/elastic/go-libaudit"
"github.com/elastic/go-libaudit/auparse"
Expand Down Expand Up @@ -67,14 +68,10 @@ func TestData(t *testing.T) {
auditMetricSet.client = &libaudit.AuditClient{Netlink: mock}

events := mbtest.RunPushMetricSetV2(10*time.Second, 1, ms)
for _, e := range events {
if e.Error != nil {
t.Fatalf("received error: %+v", e.Error)
}
}
if len(events) == 0 {
t.Fatal("received no events")
}
assertNoErrors(t, events)

beatEvent := mbtest.StandardizeEvent(ms, events[0], core.AddDatasetToEvent)
mbtest.WriteEventToDataJSON(t, beatEvent, "")
Expand Down Expand Up @@ -110,23 +107,8 @@ func TestUnicastClient(t *testing.T) {

ms := mbtest.NewPushMetricSetV2(t, c)
events := mbtest.RunPushMetricSetV2(5*time.Second, 0, ms)
for _, e := range events {
t.Log(e)

if e.Error != nil {
t.Errorf("received error: %+v", e.Error)
}
}

for _, e := range events {
v, err := e.MetricSetFields.GetValue("thing.primary")
if err == nil {
if exe, ok := v.(string); ok && exe == "/bin/cat" {
return
}
}
}
assert.Fail(t, "expected an execve event for /bin/cat")
assertNoErrors(t, events)
assertHasBinCatExecve(t, events)
}

func TestMulticastClient(t *testing.T) {
Expand Down Expand Up @@ -155,14 +137,8 @@ func TestMulticastClient(t *testing.T) {

ms := mbtest.NewPushMetricSetV2(t, c)
events := mbtest.RunPushMetricSetV2(5*time.Second, 0, ms)
for _, e := range events {
if e.Error != nil {
t.Fatalf("received error: %+v", e.Error)
}
}

// The number of events is non-deterministic so there is no validation.
t.Logf("received %d messages via multicast", len(events))
assertNoErrors(t, events)
assertHasBinCatExecve(t, events)
}

func TestKernelVersion(t *testing.T) {
Expand Down Expand Up @@ -224,3 +200,29 @@ func buildSampleEvent(t testing.TB, lines []string, filename string) {
t.Fatal(err)
}
}

func assertHasBinCatExecve(t *testing.T, events []mb.Event) {
t.Helper()

for _, e := range events {
v, err := e.RootFields.GetValue("process.exe")
if err == nil {
if exe, ok := v.(string); ok && exe == "/bin/cat" {
return
}
}
}
assert.Fail(t, "expected an execve event for /bin/cat")
}

func assertNoErrors(t *testing.T, events []mb.Event) {
t.Helper()

for _, e := range events {
t.Log(e)

if e.Error != nil {
t.Errorf("received error: %+v", e.Error)
}
}
}

0 comments on commit e709a87

Please sign in to comment.