Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RHEL/Centos support to the system/users metricset #16902

Merged
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix detection and logging of some error cases with light modules. {pull}14706[14706]
- Fix imports after PR was merged before rebase. {pull}16756[16756]
- Add dashboard for `redisenterprise` module. {pull}16752[16752]
- Dynamically choose a method for the system/service metricset to support older linux distros. {pull}16902[16902]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a moment I thought that it should say system/users metricset, but is the title of the issue what is wrong. Remember this when forging the commit message! 🙂


*Packetbeat*

Expand Down
5 changes: 4 additions & 1 deletion metricbeat/docs/modules/system.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,10 @@ metricbeat.modules:
#diskio.include_devices: []

# Filter systemd services by status or sub-status
#service.state_filter: []
#service.state_filter: ["active"]

# Filter systemd services based on a name pattern
#service.pattern_filter: ["ssh*", "nfs*"]
----

[float]
Expand Down
5 changes: 4 additions & 1 deletion metricbeat/metricbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,10 @@ metricbeat.modules:
#diskio.include_devices: []

# Filter systemd services by status or sub-status
#service.state_filter: []
#service.state_filter: ["active"]

# Filter systemd services based on a name pattern
#service.pattern_filter: ["ssh*", "nfs*"]

#------------------------------ Aerospike Module ------------------------------
- module: aerospike
Expand Down
5 changes: 4 additions & 1 deletion metricbeat/module/system/_meta/config.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,7 @@
#diskio.include_devices: []

# Filter systemd services by status or sub-status
#service.state_filter: []
#service.state_filter: ["active"]

# Filter systemd services based on a name pattern
#service.pattern_filter: ["ssh*", "nfs*"]
1 change: 1 addition & 0 deletions metricbeat/module/system/service/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ For more information, https://www.freedesktop.org/software/systemd/man/systemd.r
=== Configuration

*`service.state_filter`* - A list of service states to filter by. This can be any of the states or sub-states known to systemd.
*`service.pattern_filter`* - A list of glob patterns to filter service names by. This is an "or" filter, and will report any systemd unit that matches at least one filter pattern.

[float]
=== Dashboard
Expand Down
182 changes: 182 additions & 0 deletions metricbeat/module/system/service/dbus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//+build !netbsd

package service

import (
"encoding/xml"
"fmt"
"os"
"path/filepath"
"strconv"
"strings"

"github.com/coreos/go-systemd/v22/dbus"
dbusRaw "github.com/godbus/dbus"
"github.com/pkg/errors"
)

type unitFetcher func(conn *dbus.Conn, states, patterns []string) ([]dbus.UnitStatus, error)

// instrospectForUnitMethods determines what methods are available via dbus for listing systemd units.
// We have a number of functions, some better than others, for getting and filtering unit lists.
// This will attempt to find the most optimal method, and move down to methods that require more work.
func instrospectForUnitMethods() (unitFetcher, error) {
//setup a dbus connection
conn, err := dbusRaw.SystemBusPrivate()
if err != nil {
return nil, errors.Wrap(err, "error getting connection to system bus")
}

auth := dbusRaw.AuthExternal(strconv.Itoa(os.Getuid()))
err = conn.Auth([]dbusRaw.Auth{auth})
if err != nil {
return nil, errors.Wrap(err, "error authenticating")
}

err = conn.Hello()
if err != nil {
return nil, errors.Wrap(err, "error in Hello")
}

var props string

//call "introspect" on the systemd1 path to see what ListUnit* methods are available
obj := conn.Object("org.freedesktop.systemd1", dbusRaw.ObjectPath("/org/freedesktop/systemd1"))
err = obj.Call("org.freedesktop.DBus.Introspectable.Introspect", 0).Store(&props)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting!

if err != nil {
return nil, errors.Wrap(err, "error calling dbus")
}

unitMap, err := parseXMLAndReturnMethods(props)
if err != nil {
return nil, errors.Wrap(err, "error handling XML")
}

//return a function callback ordered by desirability
if _, ok := unitMap["ListUnitsByPatterns"]; ok {
return listUnitsByPatternWrapper, nil
} else if _, ok := unitMap["ListUnitsFiltered"]; ok {
return listUnitsFilteredWrapper, nil
} else if _, ok := unitMap["ListUnits"]; ok {
return listUnitsWrapper, nil
}
return nil, fmt.Errorf("no supported list Units function: %v", unitMap)
}

func parseXMLAndReturnMethods(str string) (map[string]bool, error) {

type Method struct {
Name string `xml:"name,attr"`
}

type Iface struct {
Name string `xml:"name,attr"`
Method []Method `xml:"method"`
}

type IntrospectData struct {
XMLName xml.Name `xml:"node"`
Interface []Iface `xml:"interface"`
}

methods := IntrospectData{}

err := xml.Unmarshal([]byte(str), &methods)
if err != nil {
return nil, errors.Wrap(err, "error unmarshalling XML")
}

if len(methods.Interface) == 0 {
return nil, errors.Wrap(err, "no methods found on introspect")
}
methodMap := make(map[string]bool)
for _, iface := range methods.Interface {
for _, method := range iface.Method {
if strings.Contains(method.Name, "ListUnits") {
methodMap[method.Name] = true
}
}
}

return methodMap, nil
}

// listUnitsByPatternWrapper is a bare wrapper for the unitFetcher type
func listUnitsByPatternWrapper(conn *dbus.Conn, states, patterns []string) ([]dbus.UnitStatus, error) {
return conn.ListUnitsByPatterns(states, patterns)
}

//listUnitsFilteredWrapper wraps the dbus ListUnitsFiltered method
func listUnitsFilteredWrapper(conn *dbus.Conn, states, patterns []string) ([]dbus.UnitStatus, error) {
units, err := conn.ListUnitsFiltered(states)
if err != nil {
return nil, errors.Wrap(err, "ListUnitsFiltered error")
}

return matchUnitPatterns(patterns, units)
}

// listUnitsWrapper wraps the dbus ListUnits method
func listUnitsWrapper(conn *dbus.Conn, states, patterns []string) ([]dbus.UnitStatus, error) {
units, err := conn.ListUnits()
if err != nil {
return nil, errors.Wrap(err, "ListUnits error")
}
if len(patterns) > 0 {
units, err = matchUnitPatterns(patterns, units)
if err != nil {
return nil, errors.Wrap(err, "error matching unit patterns")
}
}

if len(states) > 0 {
var finalUnits []dbus.UnitStatus
for _, unit := range units {
for _, state := range states {
if unit.LoadState == state || unit.ActiveState == state || unit.SubState == state {
finalUnits = append(finalUnits, unit)
break
}
}
}
return finalUnits, nil
}

return units, nil
}

// matchUnitPatterns returns a list of units that match the pattern list.
// This algo, including filepath.Match, is designed to (somewhat) emulate the behavior of ListUnitsByPatterns, which uses `fnmatch`.
func matchUnitPatterns(patterns []string, units []dbus.UnitStatus) ([]dbus.UnitStatus, error) {
var matchUnits []dbus.UnitStatus
for _, unit := range units {
for _, pattern := range patterns {
match, err := filepath.Match(pattern, unit.Name)
if err != nil {
return nil, errors.Wrapf(err, "error matching with pattern %s", pattern)
}
if match {
matchUnits = append(matchUnits, unit)
break
}
}
}
return matchUnits, nil
}
17 changes: 13 additions & 4 deletions metricbeat/module/system/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ import (

// Config stores the config object
type Config struct {
StateFilter []string `config:"service.state_filter"`
StateFilter []string `config:"service.state_filter"`
PatternFilter []string `config:"service.pattern_filter"`
}

// init registers the MetricSet with the central registry as soon as the program
Expand All @@ -47,8 +48,9 @@ func init() {
// interface methods except for Fetch.
type MetricSet struct {
mb.BaseMetricSet
conn *dbus.Conn
cfg Config
conn *dbus.Conn
cfg Config
unitList unitFetcher
}

// New creates a new instance of the MetricSet. New is responsible for unpacking
Expand All @@ -66,18 +68,25 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return nil, errors.Wrap(err, "error connecting to dbus")
}

unitFunction, err := instrospectForUnitMethods()
if err != nil {
return nil, errors.Wrap(err, "error finding ListUnits Method")
}

return &MetricSet{
BaseMetricSet: base,
conn: conn,
cfg: config,
unitList: unitFunction,
}, nil
}

// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(report mb.ReporterV2) error {
units, err := m.conn.ListUnitsByPatterns(m.cfg.StateFilter, []string{"*.service"})

units, err := m.unitList(m.conn, m.cfg.StateFilter, append([]string{"*.service"}, m.cfg.PatternFilter...))
if err != nil {
return errors.Wrap(err, "error getting list of running units")
}
Expand Down
5 changes: 4 additions & 1 deletion x-pack/metricbeat/metricbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,10 @@ metricbeat.modules:
#diskio.include_devices: []

# Filter systemd services by status or sub-status
#service.state_filter: []
#service.state_filter: ["active"]

# Filter systemd services based on a name pattern
#service.pattern_filter: ["ssh*", "nfs*"]

#------------------------------- Activemq Module -------------------------------
- module: activemq
Expand Down