diff --git a/.chloggen/drosiek-journald-matches.yaml b/.chloggen/drosiek-journald-matches.yaml new file mode 100755 index 000000000000..7953862528a0 --- /dev/null +++ b/.chloggen/drosiek-journald-matches.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: journaldreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: add support for `matches` configuration + +# One or more tracking issues related to the change +issues: [20295] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/pkg/stanza/docs/operators/journald_input.md b/pkg/stanza/docs/operators/journald_input.md index bf80c11570a1..d4455268d0a3 100644 --- a/pkg/stanza/docs/operators/journald_input.md +++ b/pkg/stanza/docs/operators/journald_input.md @@ -14,13 +14,15 @@ The `journald_input` operator will use the `__REALTIME_TIMESTAMP` field of the j | `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries. | | `directory` | | A directory containing journal files to read entries from. | | `files` | | A list of journal files to read entries from. | -| `units` | | A list of units to read entries from. | -| `priority` | `info` | Filter output by message priorities or priority ranges. | +| `units` | | A list of units to read entries from. See [Multiple filtering options](#multiple-filtering-options) examples, if you want to use it together with `matches` and/or `priority`. | +| `matches` | | A list of matches to read entries from. See [Matches](#matches) and [Multiple filtering options](#multiple-filtering-options) examples. | +| `priority` | `info` | Filter output by message priorities or priority ranges. See [Multiple filtering options](#multiple-filtering-options) examples, if you want to use it together with `units` and/or `matches`. | | `start_at` | `end` | At startup, where to start reading logs from the file. Options are `beginning` or `end`. | | `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes. | | `resource` | {} | A map of `key: value` pairs to add to the entry's resource. | ### Example Configurations + ```yaml - type: journald_input units: @@ -33,7 +35,66 @@ The `journald_input` operator will use the `__REALTIME_TIMESTAMP` field of the j - type: journald_input priority: emerg..err ``` -#### Simple journald input + +#### Matches + +The following configuration: + +```yaml +- type: journald_input + matches: + - _SYSTEMD_UNIT: ssh + - _SYSTEMD_UNIT: kubelet + _UID: "1000" +``` + +will be passed to `journalctl` as the following arguments: `journalctl ... _SYSTEMD_UNIT=ssh + _SYSTEMD_UNIT=kubelet _UID=1000`, +which is going to retrieve all entries which match at least one of the following rules: + +- `_SYSTEMD_UNIT` is `ssh` +- `_SYSTEMD_UNIT` is `kubelet` and `_UID` is `1000` + +#### Multiple filtering options + +In case of using multiple following options, conditions between them are logically `AND`ed and within them are logically `OR`ed: + +```text +( priority ) +AND +( units[0] OR units[1] OR units[2] OR ... units[U] ) +AND +( matches[0] OR matches[1] OR matches[2] OR ... matches[M] ) +``` + +Consider the following example: + +```yaml +- type: journald_input + matches: + - _SYSTEMD_UNIT: ssh + - _SYSTEMD_UNIT: kubelet + _UID: "1000" + units: + - kubelet + - systemd + priority: info +``` + +The above configuration will be passed to `journalctl` as the following arguments +`journalctl ... --priority=info --unit=kubelet --unit=systemd _SYSTEMD_UNIT=ssh + _SYSTEMD_UNIT=kubelet _UID=1000`, +which is going to effectively retrieve all entries which matches the following set of rules: + +- `_PRIORITY` is `6`, and +- `_SYSTEMD_UNIT` is `kubelet` or `systemd`, and +- entry matches at least one of the following rules: + + - `_SYSTEMD_UNIT` is `ssh` + - `_SYSTEMD_UNIT` is `kubelet` and `_UID` is `1000` + +Note, that if you use some fields which aren't associated with an entry, the entry will always be filtered out. +Also be careful about using unit name in `matches` configuration, as for the above example, none of the entry for `ssh` and `systemd` is going to be retrieved. + +### Simple journald input Configuration: ```yaml diff --git a/pkg/stanza/operator/input/journald/journald.go b/pkg/stanza/operator/input/journald/journald.go index 0eb69cab6d10..bf149766e489 100644 --- a/pkg/stanza/operator/input/journald/journald.go +++ b/pkg/stanza/operator/input/journald/journald.go @@ -24,6 +24,8 @@ import ( "fmt" "io" "os/exec" + "regexp" + "sort" "strconv" "sync" "time" @@ -60,13 +62,16 @@ func NewConfigWithID(operatorID string) *Config { type Config struct { helper.InputConfig `mapstructure:",squash"` - Directory *string `mapstructure:"directory,omitempty"` - Files []string `mapstructure:"files,omitempty"` - StartAt string `mapstructure:"start_at,omitempty"` - Units []string `mapstructure:"units,omitempty"` - Priority string `mapstructure:"priority,omitempty"` + Directory *string `mapstructure:"directory,omitempty"` + Files []string `mapstructure:"files,omitempty"` + StartAt string `mapstructure:"start_at,omitempty"` + Units []string `mapstructure:"units,omitempty"` + Priority string `mapstructure:"priority,omitempty"` + Matches []MatchConfig `mapstructure:"matches,omitempty"` } +type MatchConfig map[string]string + // Build will build a journald input operator from the supplied configuration func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) { inputOperator, err := c.InputConfig.Build(logger) @@ -74,6 +79,25 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) { return nil, err } + args, err := c.buildArgs() + if err != nil { + return nil, err + } + + return &Input{ + InputOperator: inputOperator, + newCmd: func(ctx context.Context, cursor []byte) cmd { + if cursor != nil { + args = append(args, "--after-cursor", string(cursor)) + } + return exec.CommandContext(ctx, "journalctl", args...) // #nosec - ... + // journalctl is an executable that is required for this operator to function + }, + json: jsoniter.ConfigFastest, + }, nil +} + +func (c Config) buildArgs() ([]string, error) { args := make([]string, 0, 10) // Export logs in UTC time @@ -108,17 +132,54 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) { } } - return &Input{ - InputOperator: inputOperator, - newCmd: func(ctx context.Context, cursor []byte) cmd { - if cursor != nil { - args = append(args, "--after-cursor", string(cursor)) - } - return exec.CommandContext(ctx, "journalctl", args...) // #nosec - ... - // journalctl is an executable that is required for this operator to function - }, - json: jsoniter.ConfigFastest, - }, nil + if len(c.Matches) > 0 { + matches, err := c.buildMatchesConfig() + if err != nil { + return nil, err + } + args = append(args, matches...) + } + + return args, nil +} + +func buildMatchConfig(mc MatchConfig) ([]string, error) { + re := regexp.MustCompile("^[_A-Z]+$") + + // Sort keys to be consistent with every run and to be predictable for tests + sortedKeys := make([]string, 0, len(mc)) + for key := range mc { + if !re.MatchString(key) { + return []string{}, fmt.Errorf("'%s' is not a valid Systemd field name", key) + } + sortedKeys = append(sortedKeys, key) + } + sort.Strings(sortedKeys) + + configs := []string{} + for _, key := range sortedKeys { + configs = append(configs, fmt.Sprintf("%s=%s", key, mc[key])) + } + + return configs, nil +} + +func (c Config) buildMatchesConfig() ([]string, error) { + matches := []string{} + + for i, mc := range c.Matches { + if i > 0 { + matches = append(matches, "+") + } + mcs, err := buildMatchConfig(mc) + if err != nil { + return []string{}, err + } + + matches = append(matches, mcs...) + } + + return matches, nil } // Input is an operator that process logs using journald diff --git a/pkg/stanza/operator/input/journald/journald_test.go b/pkg/stanza/operator/input/journald/journald_test.go index 1f37dbcd7c05..12cf93ff7825 100644 --- a/pkg/stanza/operator/input/journald/journald_test.go +++ b/pkg/stanza/operator/input/journald/journald_test.go @@ -24,6 +24,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -115,3 +116,82 @@ func TestInputJournald(t *testing.T) { require.FailNow(t, "Timed out waiting for entry to be read") } } + +func TestBuildConfig(t *testing.T) { + testCases := []struct { + Name string + Config func(cfg *Config) + Expected []string + ExpectedError string + }{ + { + Name: "empty config", + Config: func(cfg *Config) {}, + Expected: []string{"--utc", "--output=json", "--follow", "--priority", "info"}, + }, + { + Name: "units", + Config: func(cfg *Config) { + cfg.Units = []string{ + "dbus.service", + "user@1000.service", + } + }, + Expected: []string{"--utc", "--output=json", "--follow", "--unit", "dbus.service", "--unit", "user@1000.service", "--priority", "info"}, + }, + { + Name: "matches", + Config: func(cfg *Config) { + cfg.Matches = []MatchConfig{ + { + "_SYSTEMD_UNIT": "dbus.service", + }, + { + "_UID": "1000", + "_SYSTEMD_UNIT": "user@1000.service", + }, + } + }, + Expected: []string{"--utc", "--output=json", "--follow", "--priority", "info", "_SYSTEMD_UNIT=dbus.service", "+", "_SYSTEMD_UNIT=user@1000.service", "_UID=1000"}, + }, + { + Name: "invalid match", + Config: func(cfg *Config) { + cfg.Matches = []MatchConfig{ + { + "-SYSTEMD_UNIT": "dbus.service", + }, + } + }, + ExpectedError: "'-SYSTEMD_UNIT' is not a valid Systemd field name", + }, + { + Name: "units and matches", + Config: func(cfg *Config) { + cfg.Units = []string{"ssh"} + cfg.Matches = []MatchConfig{ + { + "_SYSTEMD_UNIT": "dbus.service", + }, + } + }, + Expected: []string{"--utc", "--output=json", "--follow", "--unit", "ssh", "--priority", "info", "_SYSTEMD_UNIT=dbus.service"}, + }, + } + + for _, tt := range testCases { + t.Run(tt.Name, func(t *testing.T) { + cfg := NewConfigWithID("my_journald_input") + tt.Config(cfg) + args, err := cfg.buildArgs() + + if tt.ExpectedError != "" { + require.Error(t, err) + require.ErrorContains(t, err, tt.ExpectedError) + return + } + require.NoError(t, err) + assert.Equal(t, tt.Expected, args) + }) + } +} diff --git a/receiver/journaldreceiver/README.md b/receiver/journaldreceiver/README.md index e7d7a7363b5f..55c6d79c4eda 100644 --- a/receiver/journaldreceiver/README.md +++ b/receiver/journaldreceiver/README.md @@ -16,8 +16,9 @@ Journald receiver is dependent on `journalctl` binary to be present and must be | `directory` | `/run/log/journal` or `/run/journal` | A directory containing journal files to read entries from | | `files` | | A list of journal files to read entries from | | `start_at` | `end` | At startup, where to start reading logs from the file. Options are beginning or end | -| `units` | `[ssh, kubelet, docker, containerd]` | A list of units to read entries from | -| `priority` | `info` | Filter output by message priorities or priority ranges | +| `units` | | A list of units to read entries from. See [Multiple filtering options](#multiple-filtering-options) examples, if you want to use it together with `matches` and/or `priority`. | +| `matches` | | A list of matches to read entries from. See [Matches](#matches) and [Multiple filtering options](#multiple-filtering-options) examples. | +| `priority` | `info` | Filter output by message priorities or priority ranges. See [Multiple filtering options](#multiple-filtering-options) examples, if you want to use it together with `units` and/or `matches`. | | `storage` | none | The ID of a storage extension to be used to store cursors. Cursors allow the receiver to pick up where it left off in the case of a collector restart. If no storage extension is used, the receiver will manage cursors in memory only. | ### Example Configurations @@ -34,5 +35,60 @@ receivers: priority: info ``` -[alpha]: https://github.com/open-telemetry/opentelemetry-collector#alpha -[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib +#### Matches + +The following configuration: + +```yaml +- type: journald_input + matches: + - _SYSTEMD_UNIT: ssh + - _SYSTEMD_UNIT: kubelet + _UID: "1000" +``` + +will be passed to `journalctl` as the following arguments: `journalctl ... _SYSTEMD_UNIT=ssh + _SYSTEMD_UNIT=kubelet _UID=1000`, +which is going to retrieve all entries which match at least one of the following rules: + +- `_SYSTEMD_UNIT` is `ssh` +- `_SYSTEMD_UNIT` is `kubelet` and `_UID` is `1000` + +#### Multiple filtering options + +In case of using multiple following options, conditions between them are logically `AND`ed and within them are logically `OR`ed: + +```text +( priority ) +AND +( units[0] OR units[1] OR units[2] OR ... units[U] ) +AND +( matches[0] OR matches[1] OR matches[2] OR ... matches[M] ) +``` + +Consider the following example: + +```yaml +- type: journald_input + matches: + - _SYSTEMD_UNIT: ssh + - _SYSTEMD_UNIT: kubelet + _UID: "1000" + units: + - kubelet + - systemd + priority: info +``` + +The above configuration will be passed to `journalctl` as the following arguments +`journalctl ... --priority=info --unit=kubelet --unit=systemd _SYSTEMD_UNIT=ssh + _SYSTEMD_UNIT=kubelet _UID=1000`, +which is going to effectively retrieve all entries which matches the following set of rules: + +- `_PRIORITY` is `6`, and +- `_SYSTEMD_UNIT` is `kubelet` or `systemd`, and +- entry matches at least one of the following rules: + + - `_SYSTEMD_UNIT` is `ssh` + - `_SYSTEMD_UNIT` is `kubelet` and `_UID` is `1000` + +Note, that if you use some fields which aren't associated with an entry, the entry will always be filtered out. +Also be careful about using unit name in `matches` configuration, as for the above example, none of the entry for `ssh` and `systemd` is going to be retrieved.