Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add per-process network stats #62

Merged
merged 3 commits into from
Nov 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 31 additions & 9 deletions metric/system/network/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,52 @@ import (
sysinfotypes "github.com/elastic/go-sysinfo/types"
)

// MapProcNetCountersWithFilter converts the NetworkCountersInfo to a formatted mapstring,
// and applies a filter to the resulting map. The filter should be an array key values, taken from /proc/PID/net/snmp or /proc/PID/net/netstat
func MapProcNetCountersWithFilter(raw *sysinfotypes.NetworkCountersInfo, filter []string) mapstr.M {
return createMap(raw, filter)
}

// MapProcNetCounters converts the NetworkCountersInfo struct into a MapStr acceptable for sending upstream
func MapProcNetCounters(raw *sysinfotypes.NetworkCountersInfo) mapstr.M {
return createMap(raw, []string{"all"})
}

func createMap(raw *sysinfotypes.NetworkCountersInfo, filter []string) mapstr.M {
eventByProto := mapstr.M{
"ip": combineMap(raw.Netstat.IPExt, raw.SNMP.IP),
"tcp": combineMap(raw.Netstat.TCPExt, raw.SNMP.TCP),
"ip": combineMap(raw.Netstat.IPExt, raw.SNMP.IP, filter),
"tcp": combineMap(raw.Netstat.TCPExt, raw.SNMP.TCP, filter),
"udp": raw.SNMP.UDP,
"udp_lite": raw.SNMP.UDPLite,
"icmp": combineMap(raw.SNMP.ICMPMsg, raw.SNMP.ICMP),
"icmp": combineMap(raw.SNMP.ICMPMsg, raw.SNMP.ICMP, filter),
}

return eventByProto
}

// combineMap concatinates two given maps
func combineMap(map1, map2 map[string]uint64) map[string]interface{} {
func combineMap(map1, map2 map[string]uint64, filter []string) map[string]interface{} {
var compMap = make(map[string]interface{})

for k, v := range map1 {
compMap[k] = checkMaxConn(k, v)
}
for k, v := range map2 {
compMap[k] = checkMaxConn(k, v)
if len(filter) == 0 || filter[0] == "all" {
for k, v := range map1 {
compMap[k] = checkMaxConn(k, v)
}
for k, v := range map2 {
compMap[k] = checkMaxConn(k, v)
}
Comment on lines +53 to +58
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if 2 maps have the same key, do we check twice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, these values are only being merged across a given protocol, and in my experience, the kernel won't report the same metric for a given protocol twice, so it should be fine? Not sure how we would recover from that anyway, since it would seemingly be the same metric reported in two different places?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realised that checkMaxConn would just do a type cast, it's not really a big deal to run it twice for the same key. So, the current logic is that the second map has a priority over the first if the keys collide, which I think is fine.

} else {
for _, key := range filter {
if value, ok := map1[key]; ok {
compMap[key] = checkMaxConn(key, value)
}
if value, ok := map2[key]; ok {
compMap[key] = checkMaxConn(key, value)
}
rdner marked this conversation as resolved.
Show resolved Hide resolved

}
}

return compMap
}

Expand Down
60 changes: 60 additions & 0 deletions metric/system/network/helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package network

import (
"testing"

"github.com/elastic/go-sysinfo/types"
"github.com/stretchr/testify/require"
)

func TestFilter(t *testing.T) {
exampleData := &types.NetworkCountersInfo{SNMP: types.SNMP{
IP: map[string]uint64{"DefaultTTL": 0x40, "ForwDatagrams": 0x3ef68, "Forwarding": 0x1, "FragCreates": 0x0, "FragFails": 0x0, "FragOKs": 0x0, "InAddrErrors": 0x2,
"InDelivers": 0x132b5d, "InReceives": 0x1904f4, "InUnknownProtos": 0x0, "OutDiscards": 0x0, "OutNoRoutes": 0xe,
"OutRequests": 0x143a7e},
ICMP: map[string]uint64{"InAddrMaskReps": 0x0},
ICMPMsg: map[string]uint64{"InType3": 0x2, "OutType3": 0x85},
TCP: map[string]uint64{"ActiveOpens": 0x63d, "AttemptFails": 0x72, "CurrEstab": 0x9, "EstabResets": 0x54, "InCsumErrors": 0x0, "InErrs": 0x0, "InSegs": 0x13270b,
"MaxConn": 0xffffffffffffffff, "OutRsts": 0x2f6, "OutSegs": 0x111424},
UDP: map[string]uint64{"NoPorts": 0x1, "OutDatagrams": 0x4d5},
UDPLite: map[string]uint64{"SndbufErrors": 0x0},
},
Netstat: types.Netstat{
TCPExt: map[string]uint64{"TCPAbortOnClose": 0x6, "TCPAbortOnData": 0x51, "TCPAbortOnLinger": 0x0, "TCPAbortOnMemory": 0x0,
"TCPAbortOnTimeout": 0x7f, "TCPAckCompressed": 0x3346, "TCPAutoCorking": 0x1063, "TCPBacklogCoalesce": 0x38a, "TCPBacklogDrop": 0x0, "TCPChallengeACK": 0x0, "TCPDSACKIgnoredDubious": 0x0,
"TCPHPHits": 0x99fc8, "TCPHystartDelayCwnd": 0x294, "TCPHystartDelayDetect": 0x3, "TCPHystartTrainCwnd": 0xa1e},
IPExt: map[string]uint64{"InBcastOctets": 0x514d4c, "InBcastPkts": 0x1e999, "InCEPkts": 0x0, "InCsumErrors": 0x0, "InECT0Pkts": 0x0, "InECT1Pkts": 0x0, "InMcastOctets": 0x44a,
"InMcastPkts": 0x12, "InNoECTPkts": 0x1ec6d9, "InNoRoutes": 0x0, "InOctets": 0x50701313, "OutMcastPkts": 0x71, "OutOctets": 0x47f14f8c, "ReasmOverlaps": 0x0},
},
}
// test with no filter
testAll := []string{"all"}
allMap := MapProcNetCountersWithFilter(exampleData, testAll)
require.Equal(t, len(exampleData.SNMP.ICMP)+len(exampleData.SNMP.ICMPMsg), len(allMap["icmp"].(map[string]interface{})))
require.Equal(t, len(exampleData.SNMP.TCP)+len(exampleData.Netstat.TCPExt), len(allMap["tcp"].(map[string]interface{})))

//test With filter
testTwo := []string{"TCPAbortOnClose", "InBcastOctets"}
filteredMap := MapProcNetCountersWithFilter(exampleData, testTwo)
require.Equal(t, 1, len(filteredMap["tcp"].(map[string]interface{})))
require.Equal(t, uint64(0x6), filteredMap["tcp"].(map[string]interface{})["TCPAbortOnClose"])

require.Equal(t, uint64(0x514d4c), filteredMap["ip"].(map[string]interface{})["InBcastOctets"])
}
24 changes: 24 additions & 0 deletions metric/system/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ import (
"github.com/elastic/elastic-agent-libs/opt"
"github.com/elastic/elastic-agent-libs/transform/typeconv"
"github.com/elastic/elastic-agent-system-metrics/metric"
"github.com/elastic/elastic-agent-system-metrics/metric/system/network"
"github.com/elastic/elastic-agent-system-metrics/metric/system/resolve"
"github.com/elastic/go-sysinfo"
sysinfotypes "github.com/elastic/go-sysinfo/types"
)

// ListStates is a wrapper that returns a list of processess with only the basic PID info filled out.
Expand Down Expand Up @@ -227,6 +230,23 @@ func (procStats *Stats) pidFill(pid int, filter bool) (ProcState, bool, error) {
}
} // end cgroups processor

// network data
if procStats.EnableNetwork {
procHandle, err := sysinfo.Process(pid)
// treat this as a soft error
if err != nil {
procStats.logger.Debugf("error initializing process handler for pid %d while trying to fetch network data: %w", pid, err)
} else {
procNet, ok := procHandle.(sysinfotypes.NetworkCounters)
if ok {
status.Network, err = procNet.NetworkCounters()
if err != nil {
procStats.logger.Debugf("error fetching network counters for process %d: %w", pid, err)
}
}
}
}

if status.CPU.Total.Ticks.Exists() {
status.CPU.Total.Value = opt.FloatWith(metric.Round(float64(status.CPU.Total.Ticks.ValueOr(0))))
}
Expand Down Expand Up @@ -263,6 +283,10 @@ func (procStats *Stats) getProcessEvent(process *ProcState) (mapstr.M, error) {
proc := mapstr.M{}
err := typeconv.Convert(&proc, process)

if procStats.EnableNetwork && process.Network != nil {
proc["network"] = network.MapProcNetCountersWithFilter(process.Network, procStats.NetworkMetrics)
}

return proc, err
}

Expand Down
13 changes: 13 additions & 0 deletions metric/system/process/process_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ type Stats struct {
IncludeTop IncludeTopConfig
CgroupOpts cgroup.ReaderOptions
EnableCgroups bool
EnableNetwork bool
// NetworkMetrics is an allowlist of network metrics,
// the names of which can be found in /proc/PID/net/snmp and /proc/PID/net/netstat
NetworkMetrics []string

skipExtended bool
procRegexps []match.Matcher // List of regular expressions used to whitelist processes.
Expand Down Expand Up @@ -164,6 +168,15 @@ func (procStats *Stats) Init() error {
procStats.Hostfs = resolve.NewTestResolver("/")
}

if procStats.EnableNetwork && procStats.Hostfs.IsSet() {
procStats.logger.Warnf("hostfs has been set to %s, and EnableNetwork has been set, but per-process network counters are currently not supported with an alternate filesystem.", procStats.Hostfs.ResolveHostFS(""))
procStats.EnableNetwork = false
}

if procStats.EnableNetwork && len(procStats.NetworkMetrics) == 0 {
procStats.logger.Warnf("Collecting all network metrics per-process; this will produce a large volume of data.")
}

procStats.ProcsMap = NewProcsTrack()

if len(procStats.Procs) == 0 {
Expand Down
55 changes: 55 additions & 0 deletions metric/system/process/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,61 @@ func TestGetOne(t *testing.T) {
t.Logf("Proc: %s", procData[0].StringToPrint())
}

func TestNetworkFetch(t *testing.T) {
if runtime.GOOS != "linux" {
t.Skip("Network data only available on linux")
}
testConfig := Stats{
Procs: []string{".*"},
Hostfs: resolve.NewTestResolver("/"),
CPUTicks: false,
EnableCgroups: false,
EnableNetwork: true,
}

err := testConfig.Init()
require.NoError(t, err)

data, err := testConfig.GetOne(os.Getpid())
require.NoError(t, err)
networkData, ok := data["network"]
require.True(t, ok, "network data not found")
require.NotEmpty(t, networkData)
}

func TestNetworkSkipWithHostfs(t *testing.T) {
testConfig := Stats{
Hostfs: resolve.NewTestResolver("testpath"),
EnableNetwork: true,
}

err := testConfig.Init()
require.NoError(t, err)
require.False(t, testConfig.EnableNetwork)
}

func TestNetworkFilter(t *testing.T) {
if runtime.GOOS != "linux" {
t.Skip("Network data only available on linux")
}
testConfig := Stats{
Hostfs: resolve.NewTestResolver("/"),
EnableNetwork: true,
NetworkMetrics: []string{"Forwarding"},
}

err := testConfig.Init()
require.NoError(t, err)

data, err := testConfig.GetOne(os.Getpid())
require.NoError(t, err)

_, exists := data.GetValue("network.ip.Forwarding")
require.NoError(t, exists, "filter did not preserve key")
ipMetrics, exists := data.GetValue("network.ip")
require.Equal(t, 1, len(ipMetrics.(map[string]interface{})))
}

func TestFilter(t *testing.T) {
//The logic itself is os-independent, so we'll only test this on the platform least likly to have CI issues
if runtime.GOOS != "linux" {
Expand Down
8 changes: 5 additions & 3 deletions metric/system/process/process_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/opt"
"github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup"
sysinfotypes "github.com/elastic/go-sysinfo/types"
)

// ProcState is the main struct for process information and metrics.
Expand All @@ -43,9 +44,10 @@ type ProcState struct {
Env mapstr.M `struct:"env,omitempty"`

// Resource Metrics
Memory ProcMemInfo `struct:"memory,omitempty"`
CPU ProcCPUInfo `struct:"cpu,omitempty"`
FD ProcFDInfo `struct:"fd,omitempty"`
Memory ProcMemInfo `struct:"memory,omitempty"`
CPU ProcCPUInfo `struct:"cpu,omitempty"`
FD ProcFDInfo `struct:"fd,omitempty"`
Network *sysinfotypes.NetworkCountersInfo `struct:"-,omitempty"`

// cgroups
Cgroup cgroup.CGStats `struct:"cgroup,omitempty"`
Expand Down