Skip to content

Commit

Permalink
Only log publish event messages in trace log level under elastic-agent (
Browse files Browse the repository at this point in the history
#34391)

* Only log publish event messages in trace log level under elastic-agent.

* Add changelog entry.

* Fix changelog.

* Add same logic to processors.
  • Loading branch information
blakerouse authored Jan 31, 2023
1 parent 1046652 commit 15ff7d1
Show file tree
Hide file tree
Showing 13 changed files with 124 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Fix race condition when stopping runners {pull}32433[32433]
- Fix concurrent map writes when system/process code called from reporter code {pull}32491[32491]
- Log errors from the Elastic Agent V2 client errors channel. Avoids blocking when error occurs communicating with the Elastic Agent. {pull}34392[34392]
- Only log publish event messages in trace log level under elastic-agent. {pull}34391[34391]

*Auditbeat*

Expand Down
5 changes: 4 additions & 1 deletion libbeat/processors/actions/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/processors/checks"
jsprocessor "github.com/elastic/beats/v7/libbeat/processors/script/javascript/module/processor"
"github.com/elastic/beats/v7/libbeat/publisher"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
)
Expand Down Expand Up @@ -81,7 +82,9 @@ func (f *appendProcessor) Run(event *beat.Event) (*beat.Event, error) {
err := f.appendValues(f.config.TargetField, f.config.Fields, f.config.Values, event)
if err != nil {
errMsg := fmt.Errorf("failed to append fields in append processor: %w", err)
f.logger.Debug(errMsg.Error())
if publisher.LogWithTrace() {
f.logger.Debug(errMsg.Error())
}
if f.config.FailOnError {
event = backup
if _, err := event.PutValue("error.message", errMsg.Error()); err != nil {
Expand Down
5 changes: 4 additions & 1 deletion libbeat/processors/actions/copy_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/processors/checks"
jsprocessor "github.com/elastic/beats/v7/libbeat/processors/script/javascript/module/processor"
"github.com/elastic/beats/v7/libbeat/publisher"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
Expand Down Expand Up @@ -79,7 +80,9 @@ func (f *copyFields) Run(event *beat.Event) (*beat.Event, error) {
err := f.copyField(field.From, field.To, event)
if err != nil {
errMsg := fmt.Errorf("Failed to copy fields in copy_fields processor: %s", err)
f.logger.Debug(errMsg.Error())
if publisher.LogWithTrace() {
f.logger.Debug(errMsg.Error())
}
if f.config.FailOnError {
event = backup
event.PutValue("error.message", errMsg.Error())
Expand Down
5 changes: 4 additions & 1 deletion libbeat/processors/actions/decode_base64_field.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/processors/checks"
jsprocessor "github.com/elastic/beats/v7/libbeat/processors/script/javascript/module/processor"
"github.com/elastic/beats/v7/libbeat/publisher"
cfg "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
Expand Down Expand Up @@ -84,7 +85,9 @@ func (f *decodeBase64Field) Run(event *beat.Event) (*beat.Event, error) {
err := f.decodeField(event)
if err != nil {
errMsg := fmt.Errorf("failed to decode base64 fields in processor: %v", err)
f.log.Debug(errMsg.Error())
if publisher.LogWithTrace() {
f.log.Debug(errMsg.Error())
}
if f.config.FailOnError {
event = backup
event.PutValue("error.message", errMsg.Error())
Expand Down
5 changes: 4 additions & 1 deletion libbeat/processors/actions/decompress_gzip_field.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/processors/checks"
"github.com/elastic/beats/v7/libbeat/publisher"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
Expand Down Expand Up @@ -76,7 +77,9 @@ func (f *decompressGzipField) Run(event *beat.Event) (*beat.Event, error) {
err := f.decompressGzipField(event)
if err != nil {
errMsg := fmt.Errorf("Failed to decompress field in decompress_gzip_field processor: %v", err)
f.log.Debug(errMsg.Error())
if publisher.LogWithTrace() {
f.log.Debug(errMsg.Error())
}
if f.config.FailOnError {
event = backup
event.PutValue("error.message", errMsg.Error())
Expand Down
5 changes: 4 additions & 1 deletion libbeat/processors/actions/rename.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/processors/checks"
jsprocessor "github.com/elastic/beats/v7/libbeat/processors/script/javascript/module/processor"
"github.com/elastic/beats/v7/libbeat/publisher"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
Expand Down Expand Up @@ -84,7 +85,9 @@ func (f *renameFields) Run(event *beat.Event) (*beat.Event, error) {
err := f.renameField(field.From, field.To, event)
if err != nil {
errMsg := fmt.Errorf("Failed to rename fields in processor: %s", err)
f.logger.Debug(errMsg.Error())
if publisher.LogWithTrace() {
f.logger.Debug(errMsg.Error())
}
if f.config.FailOnError {
event = backup
event.PutValue("error.message", errMsg.Error())
Expand Down
7 changes: 6 additions & 1 deletion libbeat/processors/actions/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ import (
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/processors/checks"
jsprocessor "github.com/elastic/beats/v7/libbeat/processors/script/javascript/module/processor"
"github.com/elastic/beats/v7/libbeat/publisher"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
)

type replaceString struct {
config replaceStringConfig
log *logp.Logger
}

type replaceStringConfig struct {
Expand Down Expand Up @@ -69,6 +71,7 @@ func NewReplaceString(c *conf.C) (processors.Processor, error) {

f := &replaceString{
config: config,
log: logp.NewLogger("replace"),
}
return f, nil
}
Expand All @@ -84,7 +87,9 @@ func (f *replaceString) Run(event *beat.Event) (*beat.Event, error) {
err := f.replaceField(field.Field, field.Pattern, field.Replacement, event)
if err != nil {
errMsg := fmt.Errorf("Failed to replace fields in processor: %s", err)
logp.Debug("replace", errMsg.Error())
if publisher.LogWithTrace() {
f.log.Debug(errMsg.Error())
}
if f.config.FailOnError {
event = backup
event.PutValue("error.message", errMsg.Error())
Expand Down
3 changes: 3 additions & 0 deletions libbeat/processors/actions/replace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"regexp"
"testing"

"github.com/elastic/elastic-agent-libs/logp"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/beat"
Expand Down Expand Up @@ -140,6 +142,7 @@ func TestReplaceRun(t *testing.T) {
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
f := &replaceString{
log: logp.NewLogger("replace"),
config: replaceStringConfig{
Fields: test.Fields,
IgnoreMissing: test.IgnoreMissing,
Expand Down
5 changes: 4 additions & 1 deletion libbeat/processors/urldecode/urldecode.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/processors/checks"
jsprocessor "github.com/elastic/beats/v7/libbeat/processors/script/javascript/module/processor"
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
Expand Down Expand Up @@ -83,7 +84,9 @@ func (p *urlDecode) Run(event *beat.Event) (*beat.Event, error) {
err := p.decodeField(field.From, field.To, event)
if err != nil {
errMsg := fmt.Errorf("failed to decode fields in urldecode processor: %v", err)
p.log.Debug(errMsg.Error())
if publisher.LogWithTrace() {
p.log.Debug(errMsg.Error())
}
if p.config.FailOnError {
event = backup
event.PutValue("error.message", errMsg.Error())
Expand Down
60 changes: 60 additions & 0 deletions libbeat/publisher/agent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package publisher

import (
"github.com/elastic/beats/v7/libbeat/common/atomic"
)

var (
// underAgent is set to true with this beat is being ran under the elastic-agent
underAgent = atomic.MakeBool(false)

// underAgentTrace is set to true when the elastic-agent has placed this beat into
// trace mode (which enables logging of published events)
underAgentTrace = atomic.MakeBool(false)
)

// SetUnderAgent sets that the processing pipeline is being ran under the elastic-agent.
func SetUnderAgent(val bool) {
underAgent.Store(val)
}

// SetUnderAgentTrace sets that trace mode has been enabled by the elastic-agent.
//
// SetUnderAgent must also be called and set to true before this has an effect.
func SetUnderAgentTrace(val bool) {
underAgentTrace.Store(val)
}

// UnderAgent returns true when running under Elastic Agent.
func UnderAgent() bool {
return underAgent.Load()
}

// LogWithTrace returns true when not running under Elastic Agent always or
// only true if running under Elastic Agent with trace logging enabled.
func LogWithTrace() bool {
agent := underAgent.Load()
if agent {
trace := underAgentTrace.Load()
return trace
}
// Always true when not running under the Elastic Agent.
return true
}
3 changes: 2 additions & 1 deletion libbeat/publisher/processing/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/processors/actions"
"github.com/elastic/beats/v7/libbeat/processors/timeseries"
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
Expand Down Expand Up @@ -356,7 +357,7 @@ func (b *builder) Create(cfg beat.ProcessingConfig, drop bool) (beat.Processor,
}

// setup 10: debug print final event (P)
if b.log.IsDebug() {
if b.log.IsDebug() || publisher.UnderAgent() {
processors.add(debugPrintProcessor(b.info, b.log))
}

Expand Down
17 changes: 10 additions & 7 deletions libbeat/publisher/processing/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/outputs/codec/json"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
)
Expand Down Expand Up @@ -199,15 +200,17 @@ func debugPrintProcessor(info beat.Info, log *logp.Logger) *processorFn {
EscapeHTML: false,
})
return newProcessor("debugPrint", func(event *beat.Event) (*beat.Event, error) {
mux.Lock()
defer mux.Unlock()
if publisher.LogWithTrace() {
mux.Lock()
defer mux.Unlock()

b, err := encoder.Encode(info.Beat, event)
if err != nil {
return event, nil
}
b, err := encoder.Encode(info.Beat, event)
if err != nil {
return event, nil
}

log.Debugf("Publish event: %s", b)
log.Debugf("Publish event: %s", b)
}
return event, nil
})
}
Expand Down
26 changes: 18 additions & 8 deletions x-pack/libbeat/management/managerV2.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/elastic/beats/v7/libbeat/common/reload"
lbmanagement "github.com/elastic/beats/v7/libbeat/management"
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/beats/v7/libbeat/version"
)

Expand Down Expand Up @@ -118,6 +119,11 @@ func NewV2AgentManager(config *conf.C, registry *reload.Registry, _ uuid.UUID) (
return nil, fmt.Errorf("error reading control config from agent: %w", err)
}

// officially running under the elastic-agent; we set the publisher pipeline
// to inform it that we are running under elastic-agent (used to ensure "Publish event: "
// debug log messages are only outputted when running in trace mode
publisher.SetUnderAgent(true)

return NewV2AgentManagerWithClient(c, registry, agentClient)
}

Expand Down Expand Up @@ -466,7 +472,9 @@ func (cm *BeatV2Manager) reload(units map[unitKey]*client.Unit) {
}

// set the new log level (if nothing has changed is a noop)
logp.SetLevel(getZapcoreLevel(lowestLevel))
ll, trace := getZapcoreLevel(lowestLevel)
logp.SetLevel(ll)
publisher.SetUnderAgentTrace(trace)

// reload the output configuration
var errs multierror.Errors
Expand Down Expand Up @@ -676,20 +684,22 @@ func getUnitState(status lbmanagement.Status) client.UnitState {
return client.UnitStateStarting
}

func getZapcoreLevel(ll client.UnitLogLevel) zapcore.Level {
func getZapcoreLevel(ll client.UnitLogLevel) (zapcore.Level, bool) {
switch ll {
case client.UnitLogLevelError:
return zapcore.ErrorLevel
return zapcore.ErrorLevel, false
case client.UnitLogLevelWarn:
return zapcore.WarnLevel
return zapcore.WarnLevel, false
case client.UnitLogLevelInfo:
return zapcore.InfoLevel
return zapcore.InfoLevel, false
case client.UnitLogLevelDebug:
return zapcore.DebugLevel
return zapcore.DebugLevel, false
case client.UnitLogLevelTrace:
// beats doesn't support trace
return zapcore.DebugLevel
// but we do allow the "Publish event:" debug logs
// when trace mode is enabled
return zapcore.DebugLevel, true
}
// info level for fallback
return zapcore.InfoLevel
return zapcore.InfoLevel, false
}

0 comments on commit 15ff7d1

Please sign in to comment.