Skip to content

Commit

Permalink
Auditd: Added metric for kernel lost events
Browse files Browse the repository at this point in the history
The `auditd` module exports a new metric, `kernel_lost`, that accounts
for the events dropped by the kernel's audit framework
  • Loading branch information
adriansr authored and andrewkroh committed May 28, 2018
1 parent 9bcd996 commit 97b809a
Showing 1 changed file with 77 additions and 12 deletions.
89 changes: 77 additions & 12 deletions auditbeat/module/auditd/audit_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@ const (

unicast = "unicast"
multicast = "multicast"

lostEventsUpdateInterval = time.Second * 15
)

var (
auditdMetrics = monitoring.Default.NewRegistry(moduleName)
lostMetric = monitoring.NewInt(auditdMetrics, "lost")
auditdMetrics = monitoring.Default.NewRegistry(moduleName)
reassemblerlostMetric = monitoring.NewInt(auditdMetrics, "reassembler_lost")
kernelLostMetric = monitoring.NewInt(auditdMetrics, "kernel_lost")
)

func init() {
Expand All @@ -48,9 +51,13 @@ func init() {
// does not rely on polling.
type MetricSet struct {
mb.BaseMetricSet
config Config
client *libaudit.AuditClient
log *logp.Logger
config Config
client *libaudit.AuditClient
log *logp.Logger
kernelLost struct {
enabled bool
counter uint32
}
}

// New constructs a new MetricSet.
Expand All @@ -69,7 +76,8 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return nil, errors.Wrap(err, "failed to create audit client")
}

lostMetric.Set(0)
reassemblerlostMetric.Set(0)
kernelLostMetric.Set(0)

return &MetricSet{
BaseMetricSet: base,
Expand Down Expand Up @@ -104,13 +112,34 @@ func (ms *MetricSet) Run(reporter mb.PushReporterV2) {
return
}

out, err := ms.receiveEvents(reporter.Done())
out, statusC, err := ms.receiveEvents(reporter.Done())
if err != nil {
reporter.Error(err)
ms.log.Errorw("Failure receiving audit events", "error", err)
return
}

if ms.kernelLost.enabled {
go func() {
timer := time.NewTicker(lostEventsUpdateInterval)
defer timer.Stop()
for {
select {
case <-reporter.Done():
return
case <-timer.C:
if seq, err := ms.client.GetStatusAsync(false); err == nil {
ms.log.Debugf("sent async status request seq=%d", seq)
} else {
ms.log.Error("get async status request failed:", err)
}
case status := <-statusC:
ms.updateKernelLostMetric(status.Lost)
}
}
}()
}

for {
select {
case <-reporter.Done():
Expand Down Expand Up @@ -189,6 +218,9 @@ func (ms *MetricSet) initClient() error {
if err != nil {
return errors.Wrap(err, "failed to get audit status")
}
ms.kernelLost.enabled = true
ms.kernelLost.counter = status.Lost

ms.log.Infow("audit status from kernel at start", "audit_status", status)

if status.Enabled == auditLocked {
Expand Down Expand Up @@ -230,27 +262,60 @@ func (ms *MetricSet) initClient() error {
return nil
}

func (ms *MetricSet) receiveEvents(done <-chan struct{}) (<-chan []*auparse.AuditMessage, error) {
func (ms *MetricSet) updateKernelLostMetric(lost uint32) {
if !ms.kernelLost.enabled {
return
}
delta := int64(lost - ms.kernelLost.counter)
if delta >= 0 {
logFn := ms.log.Debugf
if delta > 0 {
logFn = ms.log.Infof
kernelLostMetric.Add(delta)
}
logFn("kernel lost events: %d (total: %d)", delta, lost)
} else {
ms.log.Warnf("kernel lost event counter reset from %d to %d", ms.kernelLost, lost)
}
ms.kernelLost.counter = lost
}

func (ms *MetricSet) receiveEvents(done <-chan struct{}) (<-chan []*auparse.AuditMessage, <-chan *libaudit.AuditStatus, error) {
if err := ms.initClient(); err != nil {
return nil, err
return nil, nil, err
}

out := make(chan []*auparse.AuditMessage, ms.config.StreamBufferQueueSize)
statusC := make(chan *libaudit.AuditStatus, 8)
reassembler, err := libaudit.NewReassembler(int(ms.config.ReassemblerMaxInFlight), ms.config.ReassemblerTimeout, &stream{done, out})
if err != nil {
return nil, errors.Wrap(err, "failed to create Reassembler")
return nil, nil, errors.Wrap(err, "failed to create Reassembler")
}
go maintain(done, reassembler)

go func() {
defer close(out)
defer close(statusC)
defer reassembler.Close()

for {
raw, err := ms.client.Receive(false)
if err != nil {
continue
}
if raw.Type == auparse.AUDIT_GET {
status := &libaudit.AuditStatus{}
if err := status.FromWireFormat([]byte(raw.Data)); err == nil {
select {
case statusC <- status:
default:
ms.log.Debugf("Dropped audit status reply (channel busy)")
}
} else {
ms.log.Error("Decoding status message:", err)
}
continue
}

if filterRecordType(raw.Type) {
continue
Expand All @@ -266,7 +331,7 @@ func (ms *MetricSet) receiveEvents(done <-chan struct{}) (<-chan []*auparse.Audi
}
}()

return out, nil
return out, statusC, nil
}

// maintain periodically evicts timed-out events from the Reassembler. This
Expand Down Expand Up @@ -531,7 +596,7 @@ func (s *stream) ReassemblyComplete(msgs []*auparse.AuditMessage) {
}

func (s *stream) EventsLost(count int) {
lostMetric.Inc()
reassemblerlostMetric.Inc()
}

func hasMulticastSupport() bool {
Expand Down

0 comments on commit 97b809a

Please sign in to comment.