Skip to content

Commit

Permalink
Auditbeat: Add backpressure_strategy option (#7157) (#7185)
Browse files Browse the repository at this point in the history
This adds a new configuration option, "backpressure_strategy" to the auditd
module in auditbeat. It allows to set different ways in which auditbeat
can mitigate or avoid backpressure to propagate into the kernel and
having an impact on audited processes.

The possible values are:
- "kernel": Auditbeat will set the backlog_wait_time in the kernel's
  audit framework to 0. This causes events to be discarded in kernel if
  the audit backlog queue fills to capacity. Requires a 3.14 kernel or
  newer.
- "userspace": Auditbeat will drop events when there is backpressure
  from the publishing pipeline. If no rate_limit is set then it will set a rate
  limit of 5000. Users should test their setup and adjust the rate_limit
  option accordingly.
- "both": "kernel" and "userspace" strategies at the same time.
- "auto" (default): The "kernel" strategy will be used, if supported.
  Otherwise will fall back to "userspace".
- "none": No backpressure mitigation measures will be enabled.

Closes #7157

Other Changes:

* Increase default `reassembler.queue_size` to 8192.

* Change reassembler lost metric to count sequence gaps. It was renamed to `auditd.reassembler_seq_gaps`.

* Add received metric that counts the total number of received messages. It's called `auditd.received_msgs`.

* Auditd module ignores it's own syscall invocations by adding a kernel audit audit rule that ignores events from its own PID. This rule is added anytime that the user has defined audit rules.

* Make the number of stream buffer consumers configurable.

  Originally there was only one consumer for the auditd stream buffer.
  This patch allows to set up a number of consumers with the new
  `stream_buffer_consumers` setting in Auditd.

  By default it will use as many consumers as GOMAXPROCS, with a maximum of 4.
  • Loading branch information
adriansr authored and andrewkroh committed Jun 5, 2018
1 parent 8ed5dfc commit 124c8a2
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 27 deletions.
186 changes: 161 additions & 25 deletions auditbeat/module/auditd/audit_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package auditd
import (
"fmt"
"os"
"runtime"
"strconv"
"strings"
"sync"
"syscall"
"time"

Expand All @@ -18,6 +20,7 @@ import (
"github.com/elastic/go-libaudit"
"github.com/elastic/go-libaudit/aucoalesce"
"github.com/elastic/go-libaudit/auparse"
"github.com/elastic/go-libaudit/rule"
)

const (
Expand All @@ -28,13 +31,24 @@ const (
unicast = "unicast"
multicast = "multicast"

lostEventsUpdateInterval = time.Second * 15
lostEventsUpdateInterval = time.Second * 15
maxDefaultStreamBufferConsumers = 4
)

type backpressureStrategy uint8

const (
bsKernel backpressureStrategy = 1 << iota
bsUserSpace
bsAuto
)

var (
auditdMetrics = monitoring.Default.NewRegistry(moduleName)
reassemblerlostMetric = monitoring.NewInt(auditdMetrics, "reassembler_lost")
reassemblerGapsMetric = monitoring.NewInt(auditdMetrics, "reassembler_seq_gaps")
kernelLostMetric = monitoring.NewInt(auditdMetrics, "kernel_lost")
userspaceLostMetric = monitoring.NewInt(auditdMetrics, "userspace_lost")
receivedMetric = monitoring.NewInt(auditdMetrics, "received_msgs")
)

func init() {
Expand All @@ -58,6 +72,7 @@ type MetricSet struct {
enabled bool
counter uint32
}
backpressureStrategy backpressureStrategy
}

// New constructs a new MetricSet.
Expand All @@ -76,14 +91,17 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return nil, errors.Wrap(err, "failed to create audit client")
}

reassemblerlostMetric.Set(0)
reassemblerGapsMetric.Set(0)
kernelLostMetric.Set(0)
userspaceLostMetric.Set(0)
receivedMetric.Set(0)

return &MetricSet{
BaseMetricSet: base,
client: client,
config: config,
log: log,
BaseMetricSet: base,
client: client,
config: config,
log: log,
backpressureStrategy: getBackpressureStrategy(config.BackpressureStrategy, log),
}, nil
}

Expand Down Expand Up @@ -128,9 +146,7 @@ func (ms *MetricSet) Run(reporter mb.PushReporterV2) {
case <-reporter.Done():
return
case <-timer.C:
if seq, err := ms.client.GetStatusAsync(false); err == nil {
ms.log.Debugf("sent async status request seq=%d", seq)
} else {
if _, err := ms.client.GetStatusAsync(false); err != nil {
ms.log.Error("get async status request failed:", err)
}
case status := <-statusC:
Expand All @@ -140,14 +156,32 @@ func (ms *MetricSet) Run(reporter mb.PushReporterV2) {
}()
}

for {
select {
case <-reporter.Done():
return
case msgs := <-out:
reporter.Event(buildMetricbeatEvent(msgs, ms.config))
// Spawn the stream buffer consumers
numConsumers := ms.config.StreamBufferConsumers
// By default (stream_buffer_consumers=0) use as many consumers as local CPUs
// with a max of `maxDefaultStreamBufferConsumers`
if numConsumers == 0 {
if numConsumers = runtime.GOMAXPROCS(-1); numConsumers > maxDefaultStreamBufferConsumers {
numConsumers = maxDefaultStreamBufferConsumers
}
}
var wg sync.WaitGroup
wg.Add(numConsumers)

for i := 0; i < numConsumers; i++ {
go func() {
defer wg.Done()
for {
select {
case <-reporter.Done():
return
case msgs := <-out:
reporter.Event(buildMetricbeatEvent(msgs, ms.config))
}
}
}()
}
wg.Wait()
}

func (ms *MetricSet) addRules(reporter mb.PushReporterV2) error {
Expand Down Expand Up @@ -186,6 +220,12 @@ func (ms *MetricSet) addRules(reporter mb.PushReporterV2) error {
}
ms.log.Infof("Deleted %v pre-existing audit rules.", n)

// Add rule to ignore syscalls from this process
if rule, err := buildPIDIgnoreRule(os.Getpid()); err == nil {
rules = append([]auditRule{rule}, rules...)
} else {
ms.log.Errorf("Failed to build a rule to ignore self: %v", err)
}
// Add rules from config.
var failCount int
for _, rule := range rules {
Expand Down Expand Up @@ -233,18 +273,44 @@ func (ms *MetricSet) initClient() error {
}
}

if status.RateLimit != ms.config.RateLimit {
if err = ms.client.SetRateLimit(ms.config.RateLimit, libaudit.NoWait); err != nil {
return errors.Wrap(err, "failed to set audit rate limit in kernel")
}
}

if status.BacklogLimit != ms.config.BacklogLimit {
if err = ms.client.SetBacklogLimit(ms.config.BacklogLimit, libaudit.NoWait); err != nil {
return errors.Wrap(err, "failed to set audit backlog limit in kernel")
}
}

if ms.backpressureStrategy&(bsKernel|bsAuto) != 0 {
// "kernel" backpressure mitigation strategy
//
// configure the kernel to drop audit events immediately if the
// backlog queue is full.
if status.FeatureBitmap&libaudit.AuditFeatureBitmapBacklogWaitTime != 0 {
ms.log.Info("Setting kernel backlog wait time to prevent backpressure propagating to the kernel.")
if err = ms.client.SetBacklogWaitTime(0, libaudit.NoWait); err != nil {
return errors.Wrap(err, "failed to set audit backlog wait time in kernel")
}
} else {
if ms.backpressureStrategy == bsAuto {
ms.log.Warn("setting backlog wait time is not supported in this kernel. Enabling workaround.")
ms.backpressureStrategy |= bsUserSpace
} else {
return errors.New("kernel backlog wait time not supported by kernel, but required by backpressure_strategy")
}
}
}

if ms.backpressureStrategy&(bsKernel|bsUserSpace) == bsUserSpace && ms.config.RateLimit == 0 {
// force a rate limit if the user-space strategy will be used without
// corresponding backlog_wait_time setting in the kernel
ms.config.RateLimit = 5000
}

if status.RateLimit != ms.config.RateLimit {
if err = ms.client.SetRateLimit(ms.config.RateLimit, libaudit.NoWait); err != nil {
return errors.Wrap(err, "failed to set audit rate limit in kernel")
}
}

if status.Enabled == 0 {
if err = ms.client.SetEnabled(true, libaudit.NoWait); err != nil {
return errors.Wrap(err, "failed to enable auditing in the kernel")
Expand Down Expand Up @@ -287,7 +353,17 @@ func (ms *MetricSet) receiveEvents(done <-chan struct{}) (<-chan []*auparse.Audi

out := make(chan []*auparse.AuditMessage, ms.config.StreamBufferQueueSize)
statusC := make(chan *libaudit.AuditStatus, 8)
reassembler, err := libaudit.NewReassembler(int(ms.config.ReassemblerMaxInFlight), ms.config.ReassemblerTimeout, &stream{done, out})

var st libaudit.Stream = &stream{done, out}
if ms.backpressureStrategy&bsUserSpace != 0 {
// "user-space" backpressure mitigation strategy
//
// Consume events from our side as fast as possible, by dropping events
// if the publishing pipeline would block.
ms.log.Info("Using non-blocking stream to prevent backpressure propagating to the kernel.")
st = &nonBlockingStream{done, out}
}
reassembler, err := libaudit.NewReassembler(int(ms.config.ReassemblerMaxInFlight), ms.config.ReassemblerTimeout, st)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to create Reassembler")
}
Expand Down Expand Up @@ -320,7 +396,7 @@ func (ms *MetricSet) receiveEvents(done <-chan struct{}) (<-chan []*auparse.Audi
if filterRecordType(raw.Type) {
continue
}

receivedMetric.Inc()
if err := reassembler.Push(raw.Type, raw.Data); err != nil {
ms.log.Debugw("Dropping audit message",
"record_type", raw.Type,
Expand Down Expand Up @@ -596,7 +672,26 @@ func (s *stream) ReassemblyComplete(msgs []*auparse.AuditMessage) {
}

func (s *stream) EventsLost(count int) {
reassemblerlostMetric.Inc()
reassemblerGapsMetric.Add(int64(count))
}

// nonBlockingStream behaves as stream above, except that it will never block
// on backpressure from the publishing pipeline.
// Instead, events will be discarded.
type nonBlockingStream stream

func (s *nonBlockingStream) ReassemblyComplete(msgs []*auparse.AuditMessage) {
select {
case <-s.done:
return
case s.out <- msgs:
default:
userspaceLostMetric.Add(int64(len(msgs)))
}
}

func (s *nonBlockingStream) EventsLost(count int) {
(*stream)(s).EventsLost(count)
}

func hasMulticastSupport() bool {
Expand Down Expand Up @@ -728,3 +823,44 @@ func determineSocketType(c *Config, log *logp.Logger) (string, error) {
}

}

func getBackpressureStrategy(value string, logger *logp.Logger) backpressureStrategy {
switch value {
case "kernel":
return bsKernel
case "userspace", "user-space":
return bsUserSpace
case "auto":
return bsAuto
case "both":
return bsKernel | bsUserSpace
case "none":
return 0
default:
logger.Warn("Unknown value for the 'backpressure_strategy' option. Using default.")
fallthrough
case "", "default":
return bsAuto
}
}

func buildPIDIgnoreRule(pid int) (ruleData auditRule, err error) {
r := rule.SyscallRule{
Type: rule.AppendSyscallRuleType,
List: "exit",
Action: "never",
Filters: []rule.FilterSpec{
{
Type: rule.ValueFilterType,
LHS: "pid",
Comparator: "=",
RHS: strconv.Itoa(pid),
},
},
Syscalls: []string{"all"},
Keys: nil,
}
ruleData.flags = fmt.Sprintf("-A exit,never -F pid=%d -S all", pid)
ruleData.data, err = rule.Build(&r)
return ruleData, err
}
2 changes: 1 addition & 1 deletion auditbeat/module/auditd/audit_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestData(t *testing.T) {
// Get Status response for initClient
returnACK().returnStatus().
// Send expected ACKs for initialization
returnACK().returnACK().returnACK().returnACK().
returnACK().returnACK().returnACK().returnACK().returnACK().
// Send a single audit message from the kernel.
returnMessage(userLoginMsg)

Expand Down
9 changes: 8 additions & 1 deletion auditbeat/module/auditd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ type Config struct {
ReassemblerMaxInFlight uint32 `config:"reassembler.max_in_flight"`
ReassemblerTimeout time.Duration `config:"reassembler.timeout"`
StreamBufferQueueSize uint32 `config:"reassembler.queue_size"`
// BackpressureStrategy defines the strategy used to mitigate backpressure
// propagating to the kernel causing audited processes to block until
// Auditbeat can keep-up.
// One of "user-space", "kernel", "both", "none", "auto" (default)
BackpressureStrategy string `config:"backpressure_strategy"`
StreamBufferConsumers int `config:"stream_buffer_consumers"`
}

type auditRule struct {
Expand Down Expand Up @@ -129,5 +135,6 @@ var defaultConfig = Config{
Warnings: false,
ReassemblerMaxInFlight: 50,
ReassemblerTimeout: 2 * time.Second,
StreamBufferQueueSize: 64,
StreamBufferQueueSize: 8192,
StreamBufferConsumers: 0,
}

0 comments on commit 124c8a2

Please sign in to comment.