Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Auditbeat] Report process errors #9693

Merged
merged 4 commits into from
Jan 2, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 123 additions & 71 deletions x-pack/auditbeat/module/system/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (

eventTypeState = "state"
eventTypeEvent = "event"
eventTypeError = "error"
)

type eventAction uint8
Expand All @@ -44,6 +45,7 @@ const (
eventActionExistingProcess eventAction = iota
eventActionProcessStarted
eventActionProcessStopped
eventActionProcessError
)

func (action eventAction) String() string {
Expand All @@ -54,6 +56,8 @@ func (action eventAction) String() string {
return "process_started"
case eventActionProcessStopped:
return "process_stopped"
case eventActionProcessError:
return "process_error"
default:
return ""
}
Expand All @@ -78,29 +82,30 @@ type MetricSet struct {
suppressPermissionWarnings bool
}

// ProcessInfo wraps the process information and implements cache.Cacheable.
type ProcessInfo struct {
types.ProcessInfo
// Process represents information about a process.
type Process struct {
Info types.ProcessInfo
Error error
}

// Hash creates a hash for ProcessInfo.
func (pInfo ProcessInfo) Hash() uint64 {
// Hash creates a hash for Process.
func (p Process) Hash() uint64 {
h := xxhash.New64()
h.WriteString(strconv.Itoa(pInfo.PID))
h.WriteString(pInfo.StartTime.String())
h.WriteString(strconv.Itoa(p.Info.PID))
h.WriteString(p.Info.StartTime.String())
return h.Sum64()
}

func (pInfo ProcessInfo) toMapStr() common.MapStr {
func (p Process) toMapStr() common.MapStr {
return common.MapStr{
// https://github.com/elastic/ecs#-process-fields
"name": pInfo.Name,
"args": pInfo.Args,
"pid": pInfo.PID,
"ppid": pInfo.PPID,
"working_directory": pInfo.CWD,
"executable": pInfo.Exe,
"start": pInfo.StartTime,
"name": p.Info.Name,
"args": p.Info.Args,
"pid": p.Info.PID,
"ppid": p.Info.PPID,
"working_directory": p.Info.CWD,
"executable": p.Info.Exe,
"start": p.Info.StartTime,
}
}

Expand Down Expand Up @@ -142,6 +147,10 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
ms.log.Debug("No state timestamp found")
}

if os.Geteuid() != 0 {
ms.log.Warn("Running as non-root user, will likely not report all processes.")
}

return ms, nil
}

Expand Down Expand Up @@ -181,25 +190,30 @@ func (ms *MetricSet) reportState(report mb.ReporterV2) error {
ms.lastState = time.Now()
}

processInfos, err := ms.getProcessInfos()
processes, err := ms.getProcesses()
if err != nil {
return errors.Wrap(err, "failed to get process infos")
}
ms.log.Debugf("Found %v processes", len(processInfos))
ms.log.Debugf("Found %v processes", len(processes))

stateID, err := uuid.NewV4()
if err != nil {
return errors.Wrap(err, "error generating state ID")
}
for _, pInfo := range processInfos {
event := processEvent(pInfo, eventTypeState, eventActionExistingProcess)
event.RootFields.Put("event.id", stateID.String())
report.Event(event)
for _, p := range processes {
if p.Error == nil {
event := processEvent(p, eventTypeState, eventActionExistingProcess)
event.RootFields.Put("event.id", stateID.String())
report.Event(event)
} else {
ms.log.Warn(p.Error)
report.Event(processEvent(p, eventTypeError, eventActionProcessError))
}
}

if ms.cache != nil {
// This will initialize the cache with the current processes
ms.cache.DiffAndUpdateCache(convertToCacheable(processInfos))
ms.cache.DiffAndUpdateCache(convertToCacheable(processes))
}

// Save time so we know when to send the state again (config.StatePeriod)
Expand All @@ -217,39 +231,60 @@ func (ms *MetricSet) reportState(report mb.ReporterV2) error {

// reportChanges detects and reports any changes to processes on this system since the last call.
func (ms *MetricSet) reportChanges(report mb.ReporterV2) error {
processInfos, err := ms.getProcessInfos()
processes, err := ms.getProcesses()
if err != nil {
return errors.Wrap(err, "failed to get process infos")
return errors.Wrap(err, "failed to get processes")
}
ms.log.Debugf("Found %v processes", len(processInfos))
ms.log.Debugf("Found %v processes", len(processes))

started, stopped := ms.cache.DiffAndUpdateCache(convertToCacheable(processInfos))
started, stopped := ms.cache.DiffAndUpdateCache(convertToCacheable(processes))

for _, pInfo := range started {
report.Event(processEvent(pInfo.(*ProcessInfo), eventTypeEvent, eventActionProcessStarted))
for _, cacheValue := range started {
p := cacheValue.(*Process)

if p.Error == nil {
report.Event(processEvent(p, eventTypeEvent, eventActionProcessStarted))
} else {
ms.log.Warn(p.Error)
report.Event(processEvent(p, eventTypeError, eventActionProcessError))
}
}

for _, pInfo := range stopped {
report.Event(processEvent(pInfo.(*ProcessInfo), eventTypeEvent, eventActionProcessStopped))
for _, cacheValue := range stopped {
p := cacheValue.(*Process)

if p.Error == nil {
report.Event(processEvent(p, eventTypeEvent, eventActionProcessStopped))
}
}

return nil
}

func processEvent(pInfo *ProcessInfo, eventType string, action eventAction) mb.Event {
return mb.Event{
func processEvent(process *Process, eventType string, action eventAction) mb.Event {
event := mb.Event{
RootFields: common.MapStr{
"event": common.MapStr{
"kind": eventType,
"action": action.String(),
},
"process": pInfo.toMapStr(),
"message": processMessage(pInfo, action),
"process": process.toMapStr(),
"message": processMessage(process, action),
},
}

if process.Error != nil {
event.RootFields.Put("error.message", process.Error.Error())
}

return event
}

func processMessage(pInfo *ProcessInfo, action eventAction) string {
func processMessage(process *Process, action eventAction) string {
if process.Error != nil {
return fmt.Sprintf("ERROR for PID %d: %v", process.Info.PID, process.Error)
}

var actionString string
switch action {
case eventActionProcessStarted:
Expand All @@ -261,77 +296,94 @@ func processMessage(pInfo *ProcessInfo, action eventAction) string {
}

return fmt.Sprintf("Process %v (PID: %d) %v",
pInfo.Name, pInfo.PID, actionString)
process.Info.Name, process.Info.PID, actionString)
}

func convertToCacheable(processInfos []*ProcessInfo) []cache.Cacheable {
c := make([]cache.Cacheable, 0, len(processInfos))
func convertToCacheable(processes []*Process) []cache.Cacheable {
c := make([]cache.Cacheable, 0, len(processes))

for _, p := range processInfos {
for _, p := range processes {
c = append(c, p)
}

return c
}

func (ms *MetricSet) getProcessInfos() ([]*ProcessInfo, error) {
func (ms *MetricSet) getProcesses() ([]*Process, error) {
// TODO: Implement Processes() in go-sysinfo
// e.g. https://github.com/elastic/go-sysinfo/blob/master/providers/darwin/process_darwin_amd64.go#L41
pids, err := process.Pids()
if err != nil {
return nil, errors.Wrap(err, "failed to fetch the list of PIDs")
}

var processInfos []*ProcessInfo

var processes []*Process
for _, pid := range pids {
process, err := sysinfo.Process(pid)
var process *Process

sysinfoProc, err := sysinfo.Process(pid)
if err != nil {
if os.IsNotExist(err) {
// Skip - process probably just terminated since our call
// to Pids()
continue
}
return nil, errors.Wrap(err, "failed to load process")
}

pInfo, err := process.Info()
if err != nil {
if os.IsNotExist(err) {
// Skip - process probably just terminated since our call
// to Pids()
continue
// Record what we can and continue
process = &Process{
Info: types.ProcessInfo{
PID: pid,
},
Error: errors.Wrapf(err, "failed to load process with PID %d", pid),
}
} else {
pInfo, err := sysinfoProc.Info()
if err == nil {
process = &Process{
Info: pInfo,
}
} else {
if os.IsNotExist(err) {
// Skip - process probably just terminated since our call
// to Pids()
continue
}

if os.Geteuid() != 0 {
if os.IsPermission(err) || runtime.GOOS == "darwin" {
/*
Running as non-root, permission issues when trying to access other user's private
process information are expected.

if os.Geteuid() != 0 {
if os.IsPermission(err) || runtime.GOOS == "darwin" {
/*
Running as non-root, permission issues when trying to access other user's private
process information are expected.
Unfortunately, for darwin os.IsPermission() does not
work because it is a custom error created using errors.New() in
getProcTaskAllInfo() in go-sysinfo/providers/darwin/process_darwin_amd64.go

Unfortunately, for darwin os.IsPermission() does not
work because it is a custom error created using errors.New() in
getProcTaskAllInfo() in go-sysinfo/providers/darwin/process_darwin_amd64.go
TODO: Fix go-sysinfo to have better error for darwin.
*/
if !ms.suppressPermissionWarnings {
ms.log.Warnf("Failed to load process information for PID %d as non-root user. "+
"Will suppress further errors of this kind. Error: %v", pid, err)

TODO: Fix go-sysinfo to have better error for darwin.
*/
if !ms.suppressPermissionWarnings {
ms.log.Warnf("Failed to load process information for PID %d as non-root user. "+
"Will suppress further errors of this kind. Error: %v", pid, err)
// Only warn once at the start of Auditbeat.
ms.suppressPermissionWarnings = true
}

// Only warn once at the start of Auditbeat.
ms.suppressPermissionWarnings = true
//continue
}
}

continue
// Record what we can and continue
process = &Process{
Info: pInfo,
Error: errors.Wrapf(err, "failed to load process information for PID %d", pid),
}
process.Info.PID = pid // in case pInfo did not contain it
}

return nil, errors.Wrap(err, "failed to load process information")
}

processInfos = append(processInfos, &ProcessInfo{pInfo})
processes = append(processes, process)
}

return processInfos, nil
return processes, nil
}