diff --git a/.gitignore b/.gitignore index 57546893fb4..476cfd50764 100644 --- a/.gitignore +++ b/.gitignore @@ -45,7 +45,6 @@ fleet.enc.lock # Files generated with the bump version automations *.bck - # agent build/ elastic-agent diff --git a/NOTICE.txt b/NOTICE.txt index 7bc5103d040..cdd71e1a34f 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -1273,11 +1273,11 @@ SOFTWARE -------------------------------------------------------------------------------- Dependency : github.com/elastic/elastic-agent-libs -Version: v0.2.6 +Version: v0.2.15 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-libs@v0.2.6/LICENSE: +Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-libs@v0.2.15/LICENSE: Apache License Version 2.0, January 2004 diff --git a/changelog/fragments/1669236059-Capture-stdout-stderr-of-all-spawned-components-to-simplify-logging.yaml b/changelog/fragments/1669236059-Capture-stdout-stderr-of-all-spawned-components-to-simplify-logging.yaml new file mode 100644 index 00000000000..8dfa6a9aa2f --- /dev/null +++ b/changelog/fragments/1669236059-Capture-stdout-stderr-of-all-spawned-components-to-simplify-logging.yaml @@ -0,0 +1,31 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: feature + +# Change summary; a 80ish characters long description of the change. +summary: Capture stdout/stderr of all spawned components and adjust default log level to info for all components + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +#description: + +# Affected component; a word indicating the component this changeset affects. +component: + +# PR number; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: 1702 + +# Issue number; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: 221 diff --git a/go.mod b/go.mod index df1845dff01..4d44c91ae45 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/elastic/e2e-testing v1.99.2-0.20220117192005-d3365c99b9c4 github.com/elastic/elastic-agent-autodiscover v0.2.1 github.com/elastic/elastic-agent-client/v7 v7.0.0-20220804181728-b0328d2fe484 - github.com/elastic/elastic-agent-libs v0.2.6 + github.com/elastic/elastic-agent-libs v0.2.15 github.com/elastic/elastic-agent-system-metrics v0.4.4 github.com/elastic/go-licenser v0.4.0 github.com/elastic/go-sysinfo v1.8.1 diff --git a/go.sum b/go.sum index 73ded2d2cf3..ac08a20814c 100644 --- a/go.sum +++ b/go.sum @@ -387,8 +387,8 @@ github.com/elastic/elastic-agent-autodiscover v0.2.1/go.mod h1:gPnzzfdYNdgznAb+i github.com/elastic/elastic-agent-client/v7 v7.0.0-20220804181728-b0328d2fe484 h1:uJIMfLgCenJvxsVmEjBjYGxt0JddCgw2IxgoNfcIXOk= github.com/elastic/elastic-agent-client/v7 v7.0.0-20220804181728-b0328d2fe484/go.mod h1:fkvyUfFwyAG5OnMF0h+FV9sC0Xn9YLITwQpSuwungQs= github.com/elastic/elastic-agent-libs v0.2.5/go.mod h1:chO3rtcLyGlKi9S0iGVZhYCzDfdDsAQYBc+ui588AFE= -github.com/elastic/elastic-agent-libs v0.2.6 h1:DpcUcCVYZ7lNtHLUlyT1u/GtGAh49wpL15DTH7+8O5o= -github.com/elastic/elastic-agent-libs v0.2.6/go.mod h1:chO3rtcLyGlKi9S0iGVZhYCzDfdDsAQYBc+ui588AFE= +github.com/elastic/elastic-agent-libs v0.2.15 h1:hdAbrZZ2mCPcQLRCE3E8xw3mHKl8HFMt36w7jan/XGo= +github.com/elastic/elastic-agent-libs v0.2.15/go.mod h1:0J9lzJh+BjttIiVjYDLncKYCEWUUHiiqnuI64y6C6ss= github.com/elastic/elastic-agent-system-metrics v0.4.4 h1:Br3S+TlBhijrLysOvbHscFhgQ00X/trDT5VEnOau0E0= github.com/elastic/elastic-agent-system-metrics v0.4.4/go.mod h1:tF/f9Off38nfzTZHIVQ++FkXrDm9keFhFpJ+3pQ00iI= github.com/elastic/elastic-package v0.32.1/go.mod h1:l1fEnF52XRBL6a5h6uAemtdViz2bjtjUtgdQcuRhEAY= diff --git a/internal/pkg/agent/application/monitoring/v1_monitor.go b/internal/pkg/agent/application/monitoring/v1_monitor.go index 8aa7f58d1ca..5d05dfc737a 100644 --- a/internal/pkg/agent/application/monitoring/v1_monitor.go +++ b/internal/pkg/agent/application/monitoring/v1_monitor.go @@ -58,7 +58,7 @@ var ( supportedBeatsComponents = []string{"filebeat", "metricbeat", "apm-server", "auditbeat", "cloudbeat", "heartbeat", "osquerybeat", "packetbeat"} ) -// Beats monitor is providing V1 monitoring support. +// BeatsMonitor is providing V1 monitoring support for metrics and logs for endpoint-security only. type BeatsMonitor struct { enabled bool // feature flag disabling whole v1 monitoring story config *monitoringConfig @@ -178,21 +178,10 @@ func (b *BeatsMonitor) EnrichArgs(unit, binary string, args []string) []string { } } - loggingPath := loggingPath(unit, b.operatingSystem) - if loggingPath != "" { + if !b.config.C.LogMetrics { appendix = append(appendix, - "-E", "logging.files.path="+filepath.Dir(loggingPath), - "-E", "logging.files.name="+filepath.Base(loggingPath), - "-E", "logging.files.keepfiles=7", - "-E", "logging.files.permission=0640", - "-E", "logging.files.interval=1h", + "-E", "logging.metrics.enabled=false", ) - - if !b.config.C.LogMetrics { - appendix = append(appendix, - "-E", "logging.metrics.enabled=false", - ) - } } return append(args, appendix...) @@ -291,24 +280,21 @@ func (b *BeatsMonitor) injectMonitoringOutput(source, dest map[string]interface{ func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, componentIDToBinary map[string]string, monitoringOutput string) error { monitoringNamespace := b.monitoringNamespace() - //fixedAgentName := strings.ReplaceAll(agentName, "-", "_") logsDrop := filepath.Dir(loggingPath("unit", b.operatingSystem)) streams := []interface{}{ map[string]interface{}{ - idKey: "filestream-monitoring-agent", - // "data_stream" is not used when creating an Input on Filebeat - "data_stream": map[string]interface{}{ - "type": "filestream", - "dataset": "elastic_agent", - "namespace": monitoringNamespace, - }, + idKey: "filestream-monitoring-agent", "type": "filestream", "paths": []interface{}{ filepath.Join(logsDrop, agentName+"-*.ndjson"), filepath.Join(logsDrop, agentName+"-watcher-*.ndjson"), }, - "index": fmt.Sprintf("logs-elastic_agent-%s", monitoringNamespace), + "data_stream": map[string]interface{}{ + "type": "logs", + "dataset": "elastic_agent", + "namespace": monitoringNamespace, + }, "close": map[string]interface{}{ "on_state_change": map[string]interface{}{ "inactive": "5m", @@ -325,133 +311,86 @@ func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, componentIDTo }, }, "processors": []interface{}{ + // copy original dataset so we can drop the dataset field map[string]interface{}{ - "add_fields": map[string]interface{}{ - "target": "data_stream", - "fields": map[string]interface{}{ - "type": "logs", - "dataset": "elastic_agent", - "namespace": monitoringNamespace, - }, - }, - }, - map[string]interface{}{ - "add_fields": map[string]interface{}{ - "target": "event", - "fields": map[string]interface{}{ - "dataset": "elastic_agent", - }, - }, - }, - map[string]interface{}{ - "add_fields": map[string]interface{}{ - "target": "elastic_agent", - "fields": map[string]interface{}{ - "id": b.agentInfo.AgentID(), - "version": b.agentInfo.Version(), - "snapshot": b.agentInfo.Snapshot(), - }, - }, - }, - map[string]interface{}{ - "add_fields": map[string]interface{}{ - "target": "agent", - "fields": map[string]interface{}{ - "id": b.agentInfo.AgentID(), + "copy_fields": map[string]interface{}{ + "fields": []interface{}{ + map[string]interface{}{ + "from": "data_stream.dataset", + "to": "data_stream.dataset_original", + }, }, }, }, + // drop the dataset field so following copy_field can copy to it map[string]interface{}{ "drop_fields": map[string]interface{}{ "fields": []interface{}{ - "ecs.version", //coming from logger, already added by libbeat + "data_stream.dataset", }, - "ignore_missing": true, - }, - }}, - }, - } - for unit, binaryName := range componentIDToBinary { - if !isSupportedBinary(binaryName) { - continue - } - - fixedBinaryName := strings.ReplaceAll(binaryName, "-", "_") - name := strings.ReplaceAll(unit, "-", "_") // conform with index naming policy - logFile := loggingPath(unit, b.operatingSystem) - streams = append(streams, map[string]interface{}{ - idKey: "filestream-monitoring-" + name, - "data_stream": map[string]interface{}{ - // "data_stream" is not used when creating an Input on Filebeat - "type": "filestream", - "dataset": fmt.Sprintf("elastic_agent.%s", fixedBinaryName), - "namespace": monitoringNamespace, - }, - "type": "filestream", - "index": fmt.Sprintf("logs-elastic_agent.%s-%s", fixedBinaryName, monitoringNamespace), - "paths": []interface{}{logFile, logFile + "*"}, - "close": map[string]interface{}{ - "on_state_change": map[string]interface{}{ - "inactive": "5m", - }, - }, - "parsers": []interface{}{ - map[string]interface{}{ - "ndjson": map[string]interface{}{ - "message_key": "message", - "overwrite_keys": true, - "add_error_key": true, - "target": "", }, }, - }, - "processors": []interface{}{ + // copy component.dataset as the real dataset map[string]interface{}{ - "add_fields": map[string]interface{}{ - "target": "data_stream", - "fields": map[string]interface{}{ - "type": "logs", - "dataset": fmt.Sprintf("elastic_agent.%s", fixedBinaryName), - "namespace": monitoringNamespace, + "copy_fields": map[string]interface{}{ + "fields": []interface{}{ + map[string]interface{}{ + "from": "component.dataset", + "to": "data_stream.dataset", + }, }, + "fail_on_error": false, + "ignore_missing": true, }, }, + // possible it's a log message from agent itself (doesn't have component.dataset) map[string]interface{}{ - "add_fields": map[string]interface{}{ - "target": "event", - "fields": map[string]interface{}{ - "dataset": fmt.Sprintf("elastic_agent.%s", fixedBinaryName), + "copy_fields": map[string]interface{}{ + "fields": []interface{}{ + map[string]interface{}{ + "from": "data_stream.dataset_original", + "to": "data_stream.dataset", + }, }, + "fail_on_error": false, }, }, + // drop the original dataset copied and the event.dataset (as it will be updated) map[string]interface{}{ - "add_fields": map[string]interface{}{ - "target": "elastic_agent", - "fields": map[string]interface{}{ - "id": b.agentInfo.AgentID(), - "version": b.agentInfo.Version(), - "snapshot": b.agentInfo.Snapshot(), + "drop_fields": map[string]interface{}{ + "fields": []interface{}{ + "data_stream.dataset_original", + "event.dataset", }, }, }, + // update event.dataset with the now used data_stream.dataset map[string]interface{}{ - "add_fields": map[string]interface{}{ - "target": "agent", - "fields": map[string]interface{}{ - "id": b.agentInfo.AgentID(), + "copy_fields": map[string]interface{}{ + "fields": []interface{}{ + map[string]interface{}{ + "from": "data_stream.dataset", + "to": "event.dataset", + }, }, }, }, + // coming from logger, added by agent (drop) map[string]interface{}{ "drop_fields": map[string]interface{}{ "fields": []interface{}{ - "ecs.version", //coming from logger, already added by libbeat + "ecs.version", }, "ignore_missing": true, }, }, - }, - }) + // adjust destination data_stream based on the data_stream fields + map[string]interface{}{ + "add_formatted_index": map[string]interface{}{ + "index": "%{[data_stream.type]}-%{[data_stream.dataset]}-%{[data_stream.namespace]}", + }, + }}, + }, } inputs := []interface{}{ @@ -460,10 +399,7 @@ func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, componentIDTo "name": "filestream-monitoring-agent", "type": "filestream", useOutputKey: monitoringOutput, - "data_stream": map[string]interface{}{ - "namespace": monitoringNamespace, - }, - "streams": streams, + "streams": streams, }, } inputsNode, found := cfg[inputsKey] diff --git a/pkg/component/runtime/command.go b/pkg/component/runtime/command.go index 2575a35d5f1..405a4329db5 100644 --- a/pkg/component/runtime/command.go +++ b/pkg/component/runtime/command.go @@ -12,14 +12,16 @@ import ( "os/exec" "path/filepath" "runtime" + "strings" "time" - "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" - "github.com/elastic/elastic-agent/pkg/utils" - "github.com/elastic/elastic-agent-client/v7/pkg/client" + + "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" "github.com/elastic/elastic-agent/pkg/component" + "github.com/elastic/elastic-agent/pkg/core/logger" "github.com/elastic/elastic-agent/pkg/core/process" + "github.com/elastic/elastic-agent/pkg/utils" ) type actionMode int @@ -50,6 +52,7 @@ type procState struct { // CommandRuntime provides the command runtime for running a component as a subprocess. type CommandRuntime struct { + logger *logger.Logger current component.Component monitor MonitoringManager @@ -67,7 +70,7 @@ type CommandRuntime struct { } // NewCommandRuntime creates a new command runtime for the provided component. -func NewCommandRuntime(comp component.Component, monitor MonitoringManager) (ComponentRuntime, error) { +func NewCommandRuntime(comp component.Component, logger *logger.Logger, monitor MonitoringManager) (ComponentRuntime, error) { c := &CommandRuntime{ current: comp, monitor: monitor, @@ -82,6 +85,11 @@ func NewCommandRuntime(comp component.Component, monitor MonitoringManager) (Com if cmdSpec == nil { return nil, errors.New("must have command defined in specification") } + c.logger = logger.With("component", map[string]interface{}{ + "id": comp.ID, + "type": c.getSpecType(), + "binary": c.getSpecBinaryName(), + }) return c, nil } @@ -306,7 +314,7 @@ func (c *CommandRuntime) start(comm Communicator) error { proc, err := process.Start(path, process.WithArgs(args), process.WithEnv(env), - process.WithCmdOptions(attachOutErr, dirPath(workDir))) + process.WithCmdOptions(attachOutErr(c.current, c.getCommandSpec(), c.getSpecType(), c.getSpecBinaryName()), dirPath(workDir))) if err != nil { return err } @@ -452,10 +460,19 @@ func (c *CommandRuntime) getCommandSpec() *component.CommandSpec { return nil } -func attachOutErr(cmd *exec.Cmd) error { - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - return nil +func attachOutErr(comp component.Component, cmdSpec *component.CommandSpec, typeStr string, binaryName string) process.CmdOption { + return func(cmd *exec.Cmd) error { + dataset := fmt.Sprintf("elastic_agent.%s", strings.ReplaceAll(strings.ReplaceAll(comp.ID, "-", "_"), "/", "_")) + logger := logger.NewWithoutConfig("").With("component", map[string]interface{}{ + "id": comp.ID, + "type": typeStr, + "binary": binaryName, + "dataset": dataset, + }) + cmd.Stdout = newLogWriter(logger.Core(), cmdSpec.Log) + cmd.Stderr = newLogWriter(logger.Core(), cmdSpec.Log) + return nil + } } func dirPath(path string) process.CmdOption { diff --git a/pkg/component/runtime/log_writer.go b/pkg/component/runtime/log_writer.go new file mode 100644 index 00000000000..6825769f364 --- /dev/null +++ b/pkg/component/runtime/log_writer.go @@ -0,0 +1,171 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package runtime + +import ( + "bytes" + "encoding/json" + "errors" + "strings" + "time" + + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + + "github.com/elastic/elastic-agent/pkg/component" +) + +type zapcoreWriter interface { + Write(zapcore.Entry, []zapcore.Field) error +} + +// logWriter is an `io.Writer` that takes lines and passes them through the logger. +// +// `Write` handles parsing lines as either ndjson or plain text. +type logWriter struct { + loggerCore zapcoreWriter + logCfg component.CommandLogSpec + remainder []byte +} + +func newLogWriter(core zapcoreWriter, logCfg component.CommandLogSpec) *logWriter { + return &logWriter{ + loggerCore: core, + logCfg: logCfg, + } +} + +func (r *logWriter) Write(p []byte) (int, error) { + if len(p) == 0 { + // nothing to do + return 0, nil + } + offset := 0 + for { + idx := bytes.IndexByte(p[offset:], '\n') + if idx < 0 { + // not all used add to remainder to be used on next call + r.remainder = append(r.remainder, p[offset:]...) + return len(p), nil + } + + var line []byte + if r.remainder != nil { + line = r.remainder + r.remainder = nil + line = append(line, p[offset:offset+idx]...) + } else { + line = append(line, p[offset:offset+idx]...) + } + offset += idx + 1 + // drop '\r' from line (needed for Windows) + if len(line) > 0 && line[len(line)-1] == '\r' { + line = line[0 : len(line)-1] + } + if len(line) == 0 { + // empty line + continue + } + str := strings.TrimSpace(string(line)) + // try to parse line as JSON + if str[0] == '{' && r.handleJSON(str) { + // handled as JSON + continue + } + // considered standard text being it's not JSON, log at basic info level + _ = r.loggerCore.Write(zapcore.Entry{ + Level: zapcore.InfoLevel, + Time: time.Now(), + Message: str, + }, nil) + } +} + +func (r *logWriter) handleJSON(line string) bool { + var evt map[string]interface{} + if err := json.Unmarshal([]byte(line), &evt); err != nil { + return false + } + lvl := getLevel(evt, r.logCfg.LevelKey) + ts := getTimestamp(evt, r.logCfg.TimeKey, r.logCfg.TimeFormat) + msg := getMessage(evt, r.logCfg.MessageKey) + fields := getFields(evt, r.logCfg.IgnoreKeys) + _ = r.loggerCore.Write(zapcore.Entry{ + Level: lvl, + Time: ts, + Message: msg, + }, fields) + return true +} + +func getLevel(evt map[string]interface{}, key string) zapcore.Level { + lvl := zapcore.InfoLevel + err := unmarshalLevel(&lvl, getStrVal(evt, key)) + if err == nil { + delete(evt, key) + } + return lvl +} + +func unmarshalLevel(lvl *zapcore.Level, val string) error { + if val == "" { + return errors.New("empty val") + } else if val == "trace" { + // zap doesn't handle trace level we cast to debug + *lvl = zapcore.DebugLevel + return nil + } + return lvl.UnmarshalText([]byte(val)) +} + +func getMessage(evt map[string]interface{}, key string) string { + msg := getStrVal(evt, key) + if msg != "" { + delete(evt, key) + } + return msg +} + +func getTimestamp(evt map[string]interface{}, key string, format string) time.Time { + t, err := time.Parse(format, getStrVal(evt, key)) + if err == nil { + delete(evt, key) + return t + } + return time.Now() +} + +func getFields(evt map[string]interface{}, ignore []string) []zapcore.Field { + fields := make([]zapcore.Field, 0, len(evt)) + for k, v := range evt { + if len(ignore) > 0 && contains(ignore, k) { + // ignore field + continue + } + fields = append(fields, zap.Any(k, v)) + } + return fields +} + +func getStrVal(evt map[string]interface{}, key string) string { + raw, ok := evt[key] + if !ok { + return "" + } + str, ok := raw.(string) + if !ok { + return "" + } + return str +} + +func contains(s []string, val string) bool { + for _, v := range s { + if v == val { + return true + } + } + return false +} diff --git a/pkg/component/runtime/log_writer_test.go b/pkg/component/runtime/log_writer_test.go new file mode 100644 index 00000000000..5da512e9f77 --- /dev/null +++ b/pkg/component/runtime/log_writer_test.go @@ -0,0 +1,186 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package runtime + +import ( + "sort" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + + "github.com/elastic/elastic-agent/pkg/component" +) + +type wrote struct { + entry zapcore.Entry + fields []zapcore.Field +} + +func TestLogWriter(t *testing.T) { + scenarios := []struct { + Name string + Config component.CommandLogSpec + Lines []string + Wrote []wrote + }{ + { + Name: "multi plain text line", + Lines: []string{ + "simple written line\r\n", + "another written line\n", + }, + Wrote: []wrote{ + { + entry: zapcore.Entry{ + Level: zapcore.InfoLevel, + Time: time.Time{}, + Message: "simple written line", + }, + }, + { + entry: zapcore.Entry{ + Level: zapcore.InfoLevel, + Time: time.Time{}, + Message: "another written line", + }, + }, + }, + }, + { + Name: "multi split text line", + Lines: []string{ + "simple written line\r\n", + " another line sp", + "lit on ", + "", + "multi writes\n", + "\r\n", + "\n", + }, + Wrote: []wrote{ + { + entry: zapcore.Entry{ + Level: zapcore.InfoLevel, + Time: time.Time{}, + Message: "simple written line", + }, + }, + { + entry: zapcore.Entry{ + Level: zapcore.InfoLevel, + Time: time.Time{}, + Message: "another line split on multi writes", + }, + }, + }, + }, + { + Name: "json log line split", + Config: component.CommandLogSpec{ + LevelKey: "log.level", + TimeKey: "@timestamp", + TimeFormat: time.RFC3339Nano, + MessageKey: "message", + IgnoreKeys: []string{"ignore"}, + }, + Lines: []string{ + `{"@timestamp": "2009-11-10T23:00:00Z", "log.level": "debug", "message": "message`, + ` field", "string": "extra", "int": 50, "ignore": "other"}`, + "\n", + }, + Wrote: []wrote{ + { + entry: zapcore.Entry{ + Level: zapcore.DebugLevel, + Time: parseTime("2009-11-10T23:00:00Z", time.RFC3339Nano), + Message: "message field", + }, + fields: []zapcore.Field{ + zap.String("string", "extra"), + zap.Float64("int", 50), + }, + }, + }, + }, + { + Name: "invalid JSON line", + Lines: []string{ + `{"broken": json`, + "\n", + }, + Wrote: []wrote{ + { + entry: zapcore.Entry{ + Level: zapcore.InfoLevel, + Time: time.Time{}, + Message: `{"broken": json`, + }, + }, + }, + }, + } + + for _, scenario := range scenarios { + t.Run(scenario.Name, func(t *testing.T) { + c := &captureCore{} + w := newLogWriter(c, scenario.Config) + for _, line := range scenario.Lines { + l := len([]byte(line)) + c, err := w.Write([]byte(line)) + require.NoError(t, err) + require.Equal(t, l, c) + } + require.Len(t, c.wrote, len(scenario.Wrote)) + for i := 0; i < len(scenario.Wrote); i++ { + e := scenario.Wrote[i] + o := c.wrote[i] + if e.entry.Time.IsZero() { + // can't ensure times match; set it to observed before ensuring its equal + e.entry.Time = o.entry.Time + } + assert.Equal(t, e.entry, o.entry) + + // ensure the fields are in the same order (doesn't really matter for logging; but test cares) + if len(e.fields) > 0 { + sortFields(e.fields) + } + if len(o.fields) > 0 { + sortFields(o.fields) + } + assert.EqualValues(t, e.fields, o.fields) + } + }) + } +} + +type captureCore struct { + wrote []wrote +} + +func (c *captureCore) Write(entry zapcore.Entry, fields []zapcore.Field) error { + c.wrote = append(c.wrote, wrote{ + entry: entry, + fields: fields, + }) + return nil +} + +func parseTime(t string, format string) time.Time { + v, err := time.Parse(format, t) + if err != nil { + panic(err) + } + return v +} + +func sortFields(fields []zapcore.Field) { + sort.Slice(fields, func(i, j int) bool { + return fields[i].Key < fields[j].Key + }) +} diff --git a/pkg/component/runtime/runtime.go b/pkg/component/runtime/runtime.go index 0ed1b46c26c..aa780a002e5 100644 --- a/pkg/component/runtime/runtime.go +++ b/pkg/component/runtime/runtime.go @@ -60,7 +60,7 @@ func NewComponentRuntime(comp component.Component, logger *logger.Logger, monito } if comp.InputSpec != nil { if comp.InputSpec.Spec.Command != nil { - return NewCommandRuntime(comp, monitor) + return NewCommandRuntime(comp, logger, monitor) } if comp.InputSpec.Spec.Service != nil { return NewServiceRuntime(comp, logger) @@ -69,7 +69,7 @@ func NewComponentRuntime(comp component.Component, logger *logger.Logger, monito } if comp.ShipperSpec != nil { if comp.ShipperSpec.Spec.Command != nil { - return NewCommandRuntime(comp, monitor) + return NewCommandRuntime(comp, logger, monitor) } return nil, errors.New("components for shippers can only support command runtime") } diff --git a/pkg/component/spec.go b/pkg/component/spec.go index e7ec47a5811..fd109414736 100644 --- a/pkg/component/spec.go +++ b/pkg/component/spec.go @@ -78,6 +78,7 @@ type CommandSpec struct { Args []string `config:"args,omitempty" yaml:"args,omitempty"` Env []CommandEnvSpec `config:"env,omitempty" yaml:"env,omitempty"` Timeouts CommandTimeoutSpec `config:"timeouts" yaml:"timeouts"` + Log CommandLogSpec `config:"log" yaml:"log"` } // CommandEnvSpec is the specification that defines environment variables that will be set to execute the subprocess. @@ -100,6 +101,23 @@ func (t *CommandTimeoutSpec) InitDefaults() { t.Stop = 30 * time.Second } +// CommandLogSpec is the log specification for subprocess. +type CommandLogSpec struct { + LevelKey string `config:"level_key" yaml:"level_key"` + TimeKey string `config:"time_key" yaml:"time_key"` + TimeFormat string `config:"time_format" yaml:"time_format"` + MessageKey string `config:"message_key" yaml:"message_key"` + IgnoreKeys []string `config:"ignore_keys" yaml:"ignore_keys"` +} + +// InitDefaults initialized the defaults for the timeouts. +func (t *CommandLogSpec) InitDefaults() { + t.LevelKey = "log.level" + t.TimeKey = "@timestamp" + t.TimeFormat = "2006-01-02T15:04:05.000Z0700" + t.MessageKey = "message" +} + // ServiceTimeoutSpec is the timeout specification for subprocess. type ServiceTimeoutSpec struct { Checkin time.Duration `config:"checkin" yaml:"checkin"` diff --git a/pkg/core/logger/logger.go b/pkg/core/logger/logger.go index 049fd271038..8c1aa50e98e 100644 --- a/pkg/core/logger/logger.go +++ b/pkg/core/logger/logger.go @@ -58,6 +58,13 @@ func NewFromConfig(name string, cfg *Config, logInternal bool) (*Logger, error) return new(name, cfg, logInternal) } +// NewWithoutConfig returns a new logger without having a configuration. +// +// Use only when a clean logger is needed, and it is known that the logging configuration has already been performed. +func NewWithoutConfig(name string) *Logger { + return logp.NewLogger(name) +} + func new(name string, cfg *Config, logInternal bool) (*Logger, error) { commonCfg, err := toCommonConfig(cfg) if err != nil { diff --git a/specs/apm-server.spec.yml b/specs/apm-server.spec.yml index e646e9facce..0545d7ec307 100644 --- a/specs/apm-server.spec.yml +++ b/specs/apm-server.spec.yml @@ -1,23 +1,25 @@ -version: 2 -inputs: - - name: apm - description: "APM Server" - platforms: - - linux/amd64 - - linux/arm64 - - darwin/amd64 - - darwin/arm64 - - windows/amd64 - - container/amd64 - - container/arm64 - outputs: - - elasticsearch - - kafka - - logstash - - redis - command: - args: - - "-E" - - "management.enabled=true" - - "-E" - - "gc_percent=${APMSERVER_GOGC:100}" +version: 2 +inputs: + - name: apm + description: "APM Server" + platforms: + - linux/amd64 + - linux/arm64 + - darwin/amd64 + - darwin/arm64 + - windows/amd64 + - container/amd64 + - container/arm64 + outputs: + - elasticsearch + - kafka + - logstash + - redis + command: + args: + - "-E" + - "management.enabled=true" + - "-E" + - "gc_percent=${APMSERVER_GOGC:100}" + - "-E" + - "logging.to_stderr=true" diff --git a/specs/auditbeat.spec.yml b/specs/auditbeat.spec.yml index f8c46a96873..a54a47fbbe8 100644 --- a/specs/auditbeat.spec.yml +++ b/specs/auditbeat.spec.yml @@ -1,43 +1,45 @@ -version: 2 -inputs: - - name: audit/auditd - description: "Auditd" - platforms: &platforms - - linux/amd64 - - linux/arm64 - - darwin/amd64 - - darwin/arm64 - - windows/amd64 - - container/amd64 - - container/arm64 - outputs: &outputs - - elasticsearch - - kafka - - logstash - - redis - command: - args: &args - - "-E" - - "setup.ilm.enabled=false" - - "-E" - - "setup.template.enabled=false" - - "-E" - - "management.enabled=true" - - "-E" - - "logging.level=debug" - - "-E" - - "gc_percent=${AUDITBEAT_GOGC:100}" - - "-E" - - "auditbeat.config.modules.enabled=false" - - name: audit/file_integrity - description: "Audit File Integrity" - platforms: *platforms - outputs: *outputs - command: - args: *args - - name: audit/system - description: "Audit System" - platforms: *platforms - outputs: *outputs - command: - args: *args +version: 2 +inputs: + - name: audit/auditd + description: "Auditd" + platforms: &platforms + - linux/amd64 + - linux/arm64 + - darwin/amd64 + - darwin/arm64 + - windows/amd64 + - container/amd64 + - container/arm64 + outputs: &outputs + - elasticsearch + - kafka + - logstash + - redis + command: + args: &args + - "-E" + - "setup.ilm.enabled=false" + - "-E" + - "setup.template.enabled=false" + - "-E" + - "management.enabled=true" + - "-E" + - "logging.level=info" + - "-E" + - "logging.to_stderr=true" + - "-E" + - "gc_percent=${AUDITBEAT_GOGC:100}" + - "-E" + - "auditbeat.config.modules.enabled=false" + - name: audit/file_integrity + description: "Audit File Integrity" + platforms: *platforms + outputs: *outputs + command: + args: *args + - name: audit/system + description: "Audit System" + platforms: *platforms + outputs: *outputs + command: + args: *args diff --git a/specs/cloudbeat.spec.yml b/specs/cloudbeat.spec.yml index 1ecbe47e330..337ac250622 100644 --- a/specs/cloudbeat.spec.yml +++ b/specs/cloudbeat.spec.yml @@ -1,39 +1,41 @@ -version: 2 -inputs: - - name: cloudbeat - description: "Cloudbeat" - platforms: &platforms - - linux/amd64 - - linux/arm64 - - darwin/amd64 - - darwin/arm64 - - windows/amd64 - - container/amd64 - - container/arm64 - outputs: &outputs - - elasticsearch - - kafka - - logstash - - redis - command: - args: &args - - "-E" - - "management.enabled=true" - - "-E" - - "setup.ilm.enabled=false" - - "-E" - - "setup.template.enabled=false" - - "-E" - - "gc_percent=${CLOUDBEAT_GOGC:100}" - - name: cloudbeat/cis_k8s - description: "CIS Kubernetes monitoring" - platforms: *platforms - outputs: *outputs - command: - args: *args - - name: cloudbeat/cis_eks - description: "CIS elastic Kubernetes monitoring" - platforms: *platforms - outputs: *outputs - command: - args: *args \ No newline at end of file +version: 2 +inputs: + - name: cloudbeat + description: "Cloudbeat" + platforms: &platforms + - linux/amd64 + - linux/arm64 + - darwin/amd64 + - darwin/arm64 + - windows/amd64 + - container/amd64 + - container/arm64 + outputs: &outputs + - elasticsearch + - kafka + - logstash + - redis + command: + args: &args + - "-E" + - "management.enabled=true" + - "-E" + - "setup.ilm.enabled=false" + - "-E" + - "setup.template.enabled=false" + - "-E" + - "logging.to_stderr=true" + - "-E" + - "gc_percent=${CLOUDBEAT_GOGC:100}" + - name: cloudbeat/cis_k8s + description: "CIS Kubernetes monitoring" + platforms: *platforms + outputs: *outputs + command: + args: *args + - name: cloudbeat/cis_eks + description: "CIS elastic Kubernetes monitoring" + platforms: *platforms + outputs: *outputs + command: + args: *args diff --git a/specs/filebeat.spec.yml b/specs/filebeat.spec.yml index e18fcbb1e65..609fa1f5804 100644 --- a/specs/filebeat.spec.yml +++ b/specs/filebeat.spec.yml @@ -26,7 +26,9 @@ inputs: - "-E" - "management.enabled=true" - "-E" - - "logging.level=debug" + - "logging.level=info" + - "-E" + - "logging.to_stderr=true" - "-E" - "gc_percent=${FILEBEAT_GOGC:100}" - "-E" diff --git a/specs/heartbeat.spec.yml b/specs/heartbeat.spec.yml index ba6a08934b8..4036020396a 100644 --- a/specs/heartbeat.spec.yml +++ b/specs/heartbeat.spec.yml @@ -21,7 +21,9 @@ inputs: - "-E" - "management.enabled=true" - "-E" - - "logging.level=debug" + - "logging.level=info" + - "-E" + - "logging.to_stderr=true" - "-E" - "gc_percent=${HEARTBEAT_GOGC:100}" - name: synthetics/http diff --git a/specs/metricbeat.spec.yml b/specs/metricbeat.spec.yml index b7c88ad4864..e795c3b6710 100644 --- a/specs/metricbeat.spec.yml +++ b/specs/metricbeat.spec.yml @@ -26,7 +26,9 @@ inputs: - "-E" - "management.enabled=true" - "-E" - - "logging.level=debug" + - "logging.level=info" + - "-E" + - "logging.to_stderr=true" - "-E" - "gc_percent=${METRICBEAT_GOGC:100}" - "-E" diff --git a/specs/osquerybeat.spec.yml b/specs/osquerybeat.spec.yml index 31edb9a3edb..2bf4e53b8f8 100644 --- a/specs/osquerybeat.spec.yml +++ b/specs/osquerybeat.spec.yml @@ -1,26 +1,28 @@ -version: 2 -inputs: - - name: osquery - description: "Osquery" - platforms: - - linux/amd64 - - linux/arm64 - - darwin/amd64 - - darwin/arm64 - - windows/amd64 - - container/amd64 - - container/arm64 - outputs: - - elasticsearch - command: - args: - - "-E" - - "setup.ilm.enabled=false" - - "-E" - - "setup.template.enabled=false" - - "-E" - - "management.enabled=true" - - "-E" - - "logging.level=debug" - - "-E" - - "gc_percent=${OSQUERYBEAT_GOGC:100}" +version: 2 +inputs: + - name: osquery + description: "Osquery" + platforms: + - linux/amd64 + - linux/arm64 + - darwin/amd64 + - darwin/arm64 + - windows/amd64 + - container/amd64 + - container/arm64 + outputs: + - elasticsearch + command: + args: + - "-E" + - "setup.ilm.enabled=false" + - "-E" + - "setup.template.enabled=false" + - "-E" + - "management.enabled=true" + - "-E" + - "logging.level=info" + - "-E" + - "logging.to_stderr=true" + - "-E" + - "gc_percent=${OSQUERYBEAT_GOGC:100}" diff --git a/specs/packetbeat.spec.yml b/specs/packetbeat.spec.yml index 0519078cac8..cd788b89add 100644 --- a/specs/packetbeat.spec.yml +++ b/specs/packetbeat.spec.yml @@ -1,29 +1,31 @@ -version: 2 -inputs: - - name: packet - description: "Packet Capture" - platforms: - - linux/amd64 - - linux/arm64 - - darwin/amd64 - - darwin/arm64 - - windows/amd64 - - container/amd64 - - container/arm64 - outputs: - - elasticsearch - - kafka - - logstash - - redis - command: - args: - - "-E" - - "setup.ilm.enabled=false" - - "-E" - - "setup.template.enabled=false" - - "-E" - - "management.enabled=true" - - "-E" - - "logging.level=debug" - - "-E" - - "gc_percent=${PACKETBEAT_GOGC:100}" +version: 2 +inputs: + - name: packet + description: "Packet Capture" + platforms: + - linux/amd64 + - linux/arm64 + - darwin/amd64 + - darwin/arm64 + - windows/amd64 + - container/amd64 + - container/arm64 + outputs: + - elasticsearch + - kafka + - logstash + - redis + command: + args: + - "-E" + - "setup.ilm.enabled=false" + - "-E" + - "setup.template.enabled=false" + - "-E" + - "management.enabled=true" + - "-E" + - "logging.level=info" + - "-E" + - "logging.to_stderr=true" + - "-E" + - "gc_percent=${PACKETBEAT_GOGC:100}"