Skip to content

Commit

Permalink
Report network stats for processes on Linux.
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewkroh committed Sep 13, 2016
1 parent 0f59764 commit 7dd6ba8
Show file tree
Hide file tree
Showing 7 changed files with 471 additions and 32 deletions.
25 changes: 8 additions & 17 deletions metricbeat/module/system/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
package network

import (
"fmt"
"strings"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/module/system"

"github.com/pkg/errors"
"github.com/shirou/gopsutil/net"
Expand All @@ -29,27 +31,16 @@ type MetricSet struct {

// New is a mb.MetricSetFactory that returns a new MetricSet.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Unpack additional configuration options.
config := struct {
Interfaces []string `config:"interfaces"`
}{}
err := base.Module().UnpackConfig(&config)
if err != nil {
return nil, err
}

var interfaceSet map[string]struct{}
if len(config.Interfaces) > 0 {
interfaceSet = make(map[string]struct{}, len(config.Interfaces))
for _, ifc := range config.Interfaces {
interfaceSet[strings.ToLower(ifc)] = struct{}{}
}
debugf("network io stats will be included for %v", interfaceSet)
systemModule, ok := base.Module().(*system.Module)
if !ok {
return nil, fmt.Errorf("unexpected module type")
}
debugf("network io stats will be included for %v",
systemModule.NetworkInterfaces)

return &MetricSet{
BaseMetricSet: base,
interfaces: interfaceSet,
interfaces: systemModule.NetworkInterfaces,
}, nil
}

Expand Down
59 changes: 59 additions & 0 deletions metricbeat/module/system/process/network.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package process

import (
"strings"

"github.com/elastic/beats/libbeat/common"
)

func (m *MetricSet) getNetworkInterfaceStats(pid int) (common.MapStr, error) {
proc, err := m.procfs.NewProc(pid)
if err != nil {
return nil, err
}

ns, err := proc.NewNamespaces()
if err != nil {
return nil, err
}

// Only report network metrics if they differ from the overall hosts. We
// check if the process is in a different network namespace.
if net, found := ns["net"]; !found || net.Inode == m.hostNetNamespace {
return nil, nil
}

networkStats, err := proc.NewNetDev()
if err != nil {
return nil, err
}

if len(m.interfaces) > 0 {
// Filter network devices by name.
for i, stats := range networkStats {
name := strings.ToLower(stats.Name)
if _, include := m.interfaces[name]; !include {
networkStats = append(networkStats[:i], networkStats[i+1:]...)
}
}
}

// Report metrics in aggregate (to avoid having an array).
total := networkStats.Total()

return common.MapStr{
"name": total.Name,
"in": common.MapStr{
"errors": total.RxErrors,
"dropped": total.RxDropped,
"bytes": total.RxBytes,
"packets": total.RxPackets,
},
"out": common.MapStr{
"errors": total.TxErrors,
"dropped": total.TxDropped,
"packets": total.TxPackets,
"bytes": total.TxBytes,
},
}, nil
}
74 changes: 61 additions & 13 deletions metricbeat/module/system/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package process

import (
"fmt"
"path/filepath"
"runtime"

"github.com/elastic/beats/libbeat/common"
Expand All @@ -12,10 +13,11 @@ import (
"github.com/elastic/beats/metricbeat/module/system"

"github.com/elastic/gosigar/cgroup"
"github.com/elastic/gosigar/util"
"github.com/pkg/errors"
)

var debugf = logp.MakeDebug("system-process")
var debugf = logp.MakeDebug("system.process")

func init() {
if err := mb.Registry.AddMetricSet("system", "process", New); err != nil {
Expand All @@ -26,12 +28,20 @@ func init() {
// MetricSet that fetches process metrics.
type MetricSet struct {
mb.BaseMetricSet
stats *ProcStats
cgroup *cgroup.Reader
stats *ProcStats
cgroup *cgroup.Reader
procfs *util.FS
interfaces map[string]struct{}
hostNetNamespace uint32 // Inode of the current process'es net namespace.
}

// New creates and returns a new MetricSet.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
systemModule, ok := base.Module().(*system.Module)
if !ok {
return nil, fmt.Errorf("unexpected module type")
}

config := struct {
Procs []string `config:"processes"` // collect all processes by default
Cgroups bool `config:"cgroups"`
Expand All @@ -50,25 +60,50 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
ProcStats: true,
Procs: config.Procs,
},
interfaces: systemModule.NetworkInterfaces,
}

err := m.stats.InitProcStats()
if err != nil {
return nil, err
}

if runtime.GOOS == "linux" {
systemModule, ok := base.Module().(*system.Module)
if !ok {
return nil, fmt.Errorf("unexpected module type")
if runtime.GOOS == "linux" && config.Cgroups {
logp.Warn("EXPERIMENTAL: Cgroup is enabled for the system.process MetricSet.")
m.cgroup, err = cgroup.NewReader(systemModule.HostFS, true)
if err != nil {
return nil, errors.Wrap(err, "error initializing cgroup reader")
}

if config.Cgroups {
logp.Warn("EXPERIMENTAL: Cgroup is enabled for the system.process MetricSet.")
m.cgroup, err = cgroup.NewReader(systemModule.HostFS, true)
if err != nil {
return nil, errors.Wrap(err, "error initializing cgroup reader")
}
rootfsMountpoint := "/"
if systemModule.HostFS != "" {
rootfsMountpoint = systemModule.HostFS
}

procfs, err := util.NewFS(filepath.Join(rootfsMountpoint, "proc"))
if err != nil {
return nil, errors.Wrap(err, "error initializing procfs reader")
}

self, err := procfs.Self()
if err != nil {
return nil, errors.Wrap(err, "failed to get self process")
}

ns, err := self.NewNamespaces()
if err != nil {
return nil, errors.Wrap(err, "failed to get namespace info for current process")
}

net, found := ns["net"]
if !found {
debugf("net namespace info wasn't found, process network stats " +
"will not be reported")
return nil, nil
}

m.procfs = &procfs
m.hostNetNamespace = net.Inode
}

return m, nil
Expand Down Expand Up @@ -98,6 +133,19 @@ func (m *MetricSet) Fetch() ([]common.MapStr, error) {
if statsMap := cgroupStatsToMap(stats); statsMap != nil {
proc["cgroup"] = statsMap
}

if m.procfs == nil {
continue
}
network, err := m.getNetworkInterfaceStats(pid)
if err != nil {
debugf("error getting process network stats for pid=%d, %v", pid, err)
continue
}

if network != nil {
proc["network"] = network
}
}
}

Expand Down
28 changes: 26 additions & 2 deletions metricbeat/module/system/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package system
import (
"flag"
"math"
"strings"
"sync"

"github.com/elastic/beats/metricbeat/mb"
Expand All @@ -23,7 +24,8 @@ func init() {

type Module struct {
mb.BaseModule
HostFS string // Mountpoint of the host's filesystem for use in monitoring inside a container.
HostFS string // Mountpoint of the host's filesystem for use in monitoring inside a container.
NetworkInterfaces map[string]struct{} // Network interface names to report metrics for.
}

func NewModule(base mb.BaseModule) (mb.Module, error) {
Expand All @@ -32,7 +34,29 @@ func NewModule(base mb.BaseModule) (mb.Module, error) {
configureHostFS()
})

return &Module{BaseModule: base, HostFS: *HostFS}, nil
// Unpack additional configuration options.
config := struct {
Interfaces []string `config:"interfaces"`
}{}
err := base.UnpackConfig(&config)
if err != nil {
return nil, err
}

// Build a "set" from the list of interfaces.
var interfaceSet map[string]struct{}
if len(config.Interfaces) > 0 {
interfaceSet = make(map[string]struct{}, len(config.Interfaces))
for _, ifc := range config.Interfaces {
interfaceSet[strings.ToLower(ifc)] = struct{}{}
}
}

return &Module{
BaseModule: base,
HostFS: *HostFS,
NetworkInterfaces: interfaceSet,
}, nil
}

func Round(val float64, roundOn float64, places int) (newVal float64) {
Expand Down
68 changes: 68 additions & 0 deletions vendor/github.com/elastic/gosigar/util/procfs.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 7dd6ba8

Please sign in to comment.