Skip to content

Commit

Permalink
Auditbeat: Fixes for system/socket dataset (elastic#19033)
Browse files Browse the repository at this point in the history
Fixes two problems with the system/socket dataset:

- A bug in the internal state of the socket dataset that lead to an infinite
  loop in systems were the kernel aggressively reuses sockets (observed
  in kernel 2.6 / CentOS/RHEL 6.x).
- Socket expiration wasn't working as expected due to it using an
  uninitialized timestamp: Flows were expiring at every check.

Also fixes other two minor issues:

- A flow could be terminated twice by different code paths leading to wrong
  numFlows calculation and duplicated flows indexed.
- Decoupled the status debug log and socket cleanup into separate goroutines
  so that logging is still performed under high load situations.
  • Loading branch information
adriansr authored Jun 9, 2020
1 parent 247a00b commit 665b67f
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 32 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- system/package: Fix parsing of Installed-Size field of DEB packages. {issue}16661[16661] {pull}17188[17188]
- system module: Fix panic during initialisation when /proc/stat can't be read. {pull}17569[17569]
- system/package: Fix an error that can occur while trying to persist package metadata. {issue}18536[18536] {pull}18887[18887]
- system/socket: Fix dataset using 100% CPU and becoming unresponsive in some scenarios. {pull}19033[19033]
- system/socket: Fixed tracking of long-running connections. {pull}19033[19033]

*Filebeat*

Expand Down
60 changes: 40 additions & 20 deletions x-pack/auditbeat/module/system/socket/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ type flow struct {
process *process
local, remote endpoint
complete bool

done bool
// these are automatically calculated by state from kernelTimes above
createdTime, lastSeenTime time.Time
}
Expand Down Expand Up @@ -253,7 +253,6 @@ type socket struct {
process *process
// This signals that the socket is in the closeTimeout list.
closing bool
closeTime time.Time
prev, next linkedElement

createdTime, lastSeenTime time.Time
Expand Down Expand Up @@ -281,7 +280,7 @@ func (s *socket) SetNext(e linkedElement) {

// Timestamp returns the time reference used to expire sockets.
func (s *socket) Timestamp() time.Time {
return s.closeTime
return s.lastSeenTime
}

type dnsTracker struct {
Expand Down Expand Up @@ -372,13 +371,16 @@ type state struct {
closing linkedList

dns dnsTracker

// Decouple time.Now()
clock func() time.Time
}

func (s *state) getSocket(sock uintptr) *socket {
if socket, found := s.socks[sock]; found {
return socket
}
now := time.Now()
now := s.clock()
socket := &socket{
sock: sock,
createdTime: now,
Expand All @@ -397,6 +399,7 @@ var kernelProcess = process{
func NewState(r mb.PushReporterV2, log helper.Logger, inactiveTimeout, socketTimeout, closeTimeout, clockMaxDrift time.Duration) *state {
s := makeState(r, log, inactiveTimeout, socketTimeout, closeTimeout, clockMaxDrift)
go s.reapLoop()
go s.logStateLoop()
return s
}

Expand All @@ -412,6 +415,7 @@ func makeState(r mb.PushReporterV2, log helper.Logger, inactiveTimeout, socketTi
closeTimeout: closeTimeout,
clockMaxDrift: clockMaxDrift,
dns: newDNSTracker(inactiveTimeout * 2),
clock: time.Now,
}
}

Expand All @@ -438,7 +442,7 @@ func (s *state) logState() {
events := atomic.LoadUint64(&eventCount)
s.Unlock()

now := time.Now()
now := s.clock()
took := now.Sub(lastTime)
newEvs := events - lastEvents
lastEvents = events
Expand All @@ -461,8 +465,6 @@ func (s *state) logState() {
func (s *state) reapLoop() {
reportTicker := time.NewTicker(reapInterval)
defer reportTicker.Stop()
logTicker := time.NewTicker(logInterval)
defer logTicker.Stop()
for {
select {
case <-s.reporter.Done():
Expand All @@ -489,6 +491,17 @@ func (s *state) reapLoop() {
return
}
}
}
}
}

func (s *state) logStateLoop() {
logTicker := time.NewTicker(logInterval)
defer logTicker.Stop()
for {
select {
case <-s.reporter.Done():
return
case <-logTicker.C:
s.logState()
}
Expand All @@ -498,7 +511,7 @@ func (s *state) reapLoop() {
func (s *state) ExpireOlder() {
s.Lock()
defer s.Unlock()
deadline := time.Now().Add(-s.inactiveTimeout)
deadline := s.clock().Add(-s.inactiveTimeout)
for item := s.flowLRU.peek(); item != nil && item.Timestamp().Before(deadline); {
if flow, ok := item.(*flow); ok {
s.onFlowTerminated(flow)
Expand All @@ -507,8 +520,7 @@ func (s *state) ExpireOlder() {
}
item = s.flowLRU.peek()
}

deadline = time.Now().Add(-s.socketTimeout)
deadline = s.clock().Add(-s.socketTimeout)
for item := s.socketLRU.peek(); item != nil && item.Timestamp().Before(deadline); {
if sock, ok := item.(*socket); ok {
s.onSockDestroyed(sock.sock, 0)
Expand All @@ -517,8 +529,7 @@ func (s *state) ExpireOlder() {
}
item = s.socketLRU.peek()
}

deadline = time.Now().Add(-s.closeTimeout)
deadline = s.clock().Add(-s.closeTimeout)
for item := s.closing.peek(); item != nil && item.Timestamp().Before(deadline); {
if sock, ok := item.(*socket); ok {
s.onSockTerminated(sock)
Expand Down Expand Up @@ -601,6 +612,8 @@ func (s *state) onSockTerminated(sock *socket) {
delete(s.socks, sock.sock)
if sock.closing {
s.closing.remove(sock)
} else {
s.moveToClosing(sock)
}
}

Expand Down Expand Up @@ -659,7 +672,7 @@ func (s *state) mutualEnrich(sock *socket, f *flow) {
f.process = sock.process
}
if !sock.closing {
sock.lastSeenTime = time.Now()
sock.lastSeenTime = s.clock()
s.socketLRU.remove(sock)
s.socketLRU.add(sock)
}
Expand Down Expand Up @@ -699,7 +712,6 @@ func (s *state) onSockDestroyed(ptr uintptr, pid uint32) error {
if !found {
return nil
}

// Enrich with pid
if sock.pid == 0 && pid != 0 {
sock.pid = pid
Expand All @@ -710,14 +722,18 @@ func (s *state) onSockDestroyed(ptr uintptr, pid uint32) error {
// Keep the sock around in case it's a connected TCP socket, as still some
// packets can be received shortly after/during inet_release.
if !sock.closing {
sock.closeTime = time.Now()
sock.closing = true
s.socketLRU.remove(sock)
s.closing.add(sock)
s.moveToClosing(sock)
}
return nil
}

func (s *state) moveToClosing(sock *socket) {
sock.lastSeenTime = s.clock()
sock.closing = true
s.socketLRU.remove(sock)
s.closing.add(sock)
}

// UpdateFlow receives a partial flow and creates or updates an existing flow.
func (s *state) UpdateFlow(ref flow) error {
return s.UpdateFlowWithCondition(ref, nil)
Expand Down Expand Up @@ -794,7 +810,11 @@ func (f *flow) updateWith(ref flow, s *state) {
}

func (s *state) onFlowTerminated(f *flow) {
if f.done {
return
}
s.flowLRU.remove(f)
f.done = true
// Unbind this flow from its parent
if parent, found := s.socks[f.sock]; found {
delete(parent.flows, f.remote.addr.String())
Expand Down Expand Up @@ -1011,8 +1031,8 @@ func (s *state) kernTimestampToTime(ts kernelTime) time.Time {
}
if s.kernelEpoch == (time.Time{}) {
// This is the first event and time sync hasn't happened yet.
// Take a temporary epoch relative to time.Now()
now := time.Now()
// Take a temporary epoch relative to current time.
now := s.clock()
s.kernelEpoch = now.Add(-time.Duration(ts))
return now
}
Expand Down
126 changes: 114 additions & 12 deletions x-pack/auditbeat/module/system/socket/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"encoding/binary"
"fmt"
"net"
"os"
"testing"
"time"

Expand Down Expand Up @@ -134,16 +135,24 @@ func TestTCPConnWithProcess(t *testing.T) {

func TestTCPConnWithProcessSocketTimeouts(t *testing.T) {
const (
localIP = "192.168.33.10"
remoteIP = "172.19.12.13"
localPort = 38842
remotePort = 443
sock uintptr = 0xff1234
localIP = "192.168.33.10"
remoteIP = "172.19.12.13"
localPort = 38842
remotePort = 443
sock uintptr = 0xff1234
flowTimeout = time.Hour
socketTimeout = time.Minute * 3
closeTimeout = time.Minute
)
st := makeState(nil, (*logWrapper)(t), time.Second, 0, 0, time.Second)
st := makeState(nil, (*logWrapper)(t), flowTimeout, socketTimeout, closeTimeout, time.Second)
now := time.Now()
st.clock = func() time.Time {
return now
}
lPort, rPort := be16(localPort), be16(remotePort)
lAddr, rAddr := ipv4(localIP), ipv4(remoteIP)
evs := []event{

callExecve(meta(1234, 1234, 1), []string{"/usr/bin/curl", "https://example.net/", "-o", "/tmp/site.html"}),
&commitCreds{Meta: meta(1234, 1234, 2), UID: 501, GID: 20, EUID: 501, EGID: 20},
&execveRet{Meta: meta(1234, 1234, 2), Retval: 1234},
Expand Down Expand Up @@ -174,7 +183,18 @@ func TestTCPConnWithProcessSocketTimeouts(t *testing.T) {
t.Fatal(err)
}
st.ExpireOlder()
// Nothing expired just yet.
flows, err := getFlows(st.DoneFlows(), all)
if err != nil {
t.Fatal(err)
}
assert.Empty(t, flows)

evs = []event{
&clockSyncCall{
Meta: meta(uint32(os.Getpid()), 1235, 0),
Ts: uint64(now.UnixNano()),
},
&inetReleaseCall{Meta: meta(0, 0, 15), Sock: sock},
&tcpV4DoRcv{
Meta: meta(0, 0, 17),
Expand All @@ -185,17 +205,31 @@ func TestTCPConnWithProcessSocketTimeouts(t *testing.T) {
RAddr: rAddr,
RPort: rPort,
},
&doExit{Meta: meta(1234, 1234, 18)},

&inetCreate{Meta: meta(1234, 1235, 18), Proto: 0},
&sockInitData{Meta: meta(1234, 1235, 19), Sock: sock + 1},
&tcpIPv4ConnectCall{Meta: meta(1234, 1235, 20), Sock: sock + 1, RAddr: rAddr, RPort: rPort},
&tcpV4DoRcv{
Meta: meta(0, 0, 21),
Sock: sock + 1,
Size: 12,
LAddr: lAddr,
LPort: lPort,
RAddr: rAddr,
RPort: rPort,
},
}
if err := feedEvents(evs, st, t); err != nil {
t.Fatal(err)
}
// Expire the first socket
now = now.Add(closeTimeout + 1)
st.ExpireOlder()
flows, err := getFlows(st.DoneFlows(), all)
flows, err = getFlows(st.DoneFlows(), all)
if err != nil {
t.Fatal(err)
}
assert.Len(t, flows, 2)
assert.Len(t, flows, 1)
flow := flows[0]
t.Log("read flow 0", flow)
for field, expected := range map[string]interface{}{
Expand All @@ -207,8 +241,8 @@ func TestTCPConnWithProcessSocketTimeouts(t *testing.T) {
"client.port": localPort,
"destination.ip": remoteIP,
"destination.port": remotePort,
"destination.packets": uint64(1),
"destination.bytes": uint64(12),
"destination.packets": uint64(2),
"destination.bytes": uint64(19),
"server.ip": remoteIP,
"server.port": remotePort,
"network.direction": "outbound",
Expand All @@ -224,10 +258,28 @@ func TestTCPConnWithProcessSocketTimeouts(t *testing.T) {
t.Fatal("expected value not found")
}
}
// Wait until sock+1 expires due to inactivity. It won't be available
// just yet.
now = now.Add(socketTimeout + 1)
st.ExpireOlder()
flows, err = getFlows(st.DoneFlows(), all)
if err != nil {
t.Fatal(err)
}
assert.Empty(t, flows)

// Wait until the sock is closed completely.
now = now.Add(closeTimeout + 1)
st.ExpireOlder()
flows, err = getFlows(st.DoneFlows(), all)
if err != nil {
t.Fatal(err)
}
assert.Len(t, flows, 1)
flow = flows[0]

// we have a truncated flow with no directionality,
// so just report what we can
flow = flows[1]
t.Log("read flow 1", flow)
for field, expected := range map[string]interface{}{
"source.ip": localIP,
Expand Down Expand Up @@ -706,5 +758,55 @@ func TestUDPSendMsgAltLogic(t *testing.T) {
ev.AltRAddrA, ev.AltRAddrB = ipv6("fddd::cafe")
assert.Equal(t, expectedIPv6, ev.String())
})
}

func TestSocketReuse(t *testing.T) {
const (
localIP = "192.168.33.10"
remoteIP = "172.19.12.13"
localPort = 38842
remotePort = 53
sock uintptr = 0xff1234
)
st := makeState(nil, (*logWrapper)(t), time.Hour, time.Hour, 0, time.Hour)
lPort, rPort := be16(localPort), be16(remotePort)
lAddr, rAddr := ipv4(localIP), ipv4(remoteIP)
evs := []event{
&clockSyncCall{
Meta: meta(uint32(os.Getpid()), 1235, 5),
Ts: uint64(time.Now().UnixNano()),
},
&inetCreate{Meta: meta(1234, 1235, 5), Proto: 0},
&sockInitData{Meta: meta(1234, 1235, 5), Sock: sock},
&udpSendMsgCall{
Meta: meta(1234, 1235, 6),
Sock: sock,
Size: 123,
LAddr: lAddr,
AltRAddr: rAddr,
LPort: lPort,
AltRPort: rPort,
},
// Asume inetRelease lost.
&inetCreate{Meta: meta(1234, 1235, 5), Proto: 0},
&sockInitData{Meta: meta(1234, 1235, 5), Sock: sock},
&udpSendMsgCall{
Meta: meta(1234, 1235, 6),
Sock: sock,
Size: 123,
LAddr: lAddr,
AltRAddr: rAddr,
LPort: lPort,
AltRPort: rPort,
},
}
if err := feedEvents(evs, st, t); err != nil {
t.Fatal(err)
}
st.ExpireOlder()
flows, err := getFlows(st.DoneFlows(), all)
if err != nil {
t.Fatal(err)
}
assert.Len(t, flows, 1)
}

0 comments on commit 665b67f

Please sign in to comment.