Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rate limit Elasticsearch client indexing error logs #40448

Merged
merged 5 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Beats won't log start up information when running under the Elastic Agent {40390}40390[40390]
- Elevate effective capability set to match the Permitted set for agentbeat {pull}40466[40466]
- Filebeat now needs `dup3`, `faccessat2`, `prctl` and `setrlimit` syscalls to run the journald input. If this input is not being used, the syscalls are not needed. All Beats have those syscalls allowed now because the default seccomp policy is global to all Beats. {pull}40061[40061]
- Beats will rate limit the logs about errors when indexing events on Elasticsearch, logging a summary every 10s. The logs sent to the event log is unchanged. {issue}40157[40157]

*Auditbeat*

Expand Down
4 changes: 2 additions & 2 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13000,11 +13000,11 @@ SOFTWARE

--------------------------------------------------------------------------------
Dependency : github.com/elastic/elastic-agent-libs
Version: v0.9.15
Version: v0.10.0
Licence type (autodetected): Apache-2.0
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-libs@v0.9.15/LICENSE:
Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-libs@v0.10.0/LICENSE:

Apache License
Version 2.0, January 2004
Expand Down
7 changes: 4 additions & 3 deletions filebeat/tests/integration/event_log_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,10 @@ func TestEventsLoggerESOutput(t *testing.T) {

_, _ = logFile.WriteString(`
{"message":"foo bar","int":10,"string":"str"}
{"message":"index failure 1","int":"not a number","string":10}
{"message":"another message","int":20,"string":"str2"}
{"message":"index failure","int":"not a number","string":10}
{"message":"second index failure","int":"not a number","string":10}
{"message":"index failure 2","int":"not a number","string":10}
{"message":"index failure 3","int":"not a number","string":10}
`)
if err := logFile.Sync(); err != nil {
t.Fatalf("could not sync log file '%s': %s", logFilePath, err)
Expand All @@ -99,7 +100,7 @@ func TestEventsLoggerESOutput(t *testing.T) {

// Wait for a log entry that indicates an entry in the events
// logger file.
msg := "Cannot index event (status=400)"
msg := "Failed to index 3 events in last"
require.Eventually(t, func() bool {
return filebeat.LogContains(msg)
}, time.Minute, 100*time.Millisecond,
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ require (
github.com/elastic/bayeux v1.0.5
github.com/elastic/ebpfevents v0.6.0
github.com/elastic/elastic-agent-autodiscover v0.8.1
github.com/elastic/elastic-agent-libs v0.9.15
github.com/elastic/elastic-agent-libs v0.10.0
github.com/elastic/elastic-agent-system-metrics v0.11.1
github.com/elastic/go-elasticsearch/v8 v8.14.0
github.com/elastic/go-sfdc v0.0.0-20240621062639-bcc8456508ff
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -560,8 +560,8 @@ github.com/elastic/elastic-agent-autodiscover v0.8.1 h1:u6TWqh7wfevu6S4GUq4SIxYB
github.com/elastic/elastic-agent-autodiscover v0.8.1/go.mod h1:0gzGsaDCAqBfUZjuCqqWsSI60eaZ778A5tQZV72rPV0=
github.com/elastic/elastic-agent-client/v7 v7.15.0 h1:nDB7v8TBoNuD6IIzC3z7Q0y+7bMgXoT2DsHfolO2CHE=
github.com/elastic/elastic-agent-client/v7 v7.15.0/go.mod h1:6h+f9QdIr3GO2ODC0Y8+aEXRwzbA5W4eV4dd/67z7nI=
github.com/elastic/elastic-agent-libs v0.9.15 h1:WCLtuErafUxczT/rXJa4Vr6mxwC8dgtqMbEq+qWGD4M=
github.com/elastic/elastic-agent-libs v0.9.15/go.mod h1:2VgYxHaeM+cCDBjiS2wbmTvzPGbnlXAtYrlcLefheS8=
github.com/elastic/elastic-agent-libs v0.10.0 h1:W7uvay0UYdLPtauXGsMD8Xfoe4qtcVWQR4icBgf/26Q=
github.com/elastic/elastic-agent-libs v0.10.0/go.mod h1:2VgYxHaeM+cCDBjiS2wbmTvzPGbnlXAtYrlcLefheS8=
github.com/elastic/elastic-agent-system-metrics v0.11.1 h1:BxViQHnqxvvi/65rj3mGwG6Eto6ldFCTnuDTUJnakaU=
github.com/elastic/elastic-agent-system-metrics v0.11.1/go.mod h1:3QiMu9wTKJFvpCN+5klgGqasTMNKJbgY3xcoN1KQXJk=
github.com/elastic/elastic-transport-go/v8 v8.6.0 h1:Y2S/FBjx1LlCv5m6pWAF2kDJAHoSjSRSJCApolgfthA=
Expand Down
40 changes: 34 additions & 6 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/periodic"
"github.com/elastic/elastic-agent-libs/testing"
"github.com/elastic/elastic-agent-libs/version"
)
Expand All @@ -58,7 +59,10 @@ type Client struct {
// forwarded to this index. Otherwise, they will be dropped.
deadLetterIndex string

log *logp.Logger
log *logp.Logger
pLogIndex *periodic.Doer
pLogIndexTryDeadLetter *periodic.Doer
pLogDeadLetter *periodic.Doer
}

// clientSettings contains the settings for a client.
Expand Down Expand Up @@ -154,20 +158,44 @@ func NewClient(
return nil
}

// Make sure there's a non-nil obser
// Make sure there's a non-nil observer
observer := s.observer
if observer == nil {
observer = outputs.NewNilObserver()
}

log := logp.NewLogger("elasticsearch")

pLogDeadLetter := periodic.NewDoer(10*time.Second,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest adding a comment explaining why we are rate limiting these so future readers don't need to guess, and can avoid adding new per-event info logs.

func(count uint64, d time.Duration) {
log.Errorf(
"Failed to deliver to dead letter index %d events in last %s. Look at the event log to view the event and cause.", count, d)
})
pLogIndex := periodic.NewDoer(10*time.Second, func(count uint64, d time.Duration) {
log.Warnf(
"Failed to index %d events in last %s: events were dropped! Look at the event log to view the event and cause.",
count, d)
})
pLogIndexTryDeadLetter := periodic.NewDoer(10*time.Second, func(count uint64, d time.Duration) {
log.Warnf(
"Failed to index %d events in last %s: tried dead letter index. Look at the event log to view the event and cause.",
count, d)
})

pLogDeadLetter.Start()
pLogIndex.Start()
pLogIndexTryDeadLetter.Start()
client := &Client{
conn: *conn,
indexSelector: s.indexSelector,
pipelineSelector: pipeline,
observer: observer,
deadLetterIndex: s.deadLetterIndex,

log: logp.NewLogger("elasticsearch"),
log: log,
pLogDeadLetter: pLogDeadLetter,
pLogIndex: pLogIndex,
pLogIndexTryDeadLetter: pLogIndexTryDeadLetter,
}

return client, nil
Expand Down Expand Up @@ -478,14 +506,14 @@ func (client *Client) applyItemStatus(
if encodedEvent.deadLetter {
// Fatal error while sending an already-failed event to the dead letter
// index, drop.
client.log.Errorf("Can't deliver to dead letter index event (status=%v). Look at the event log to view the event and cause.", itemStatus)
client.pLogDeadLetter.Add()
client.log.Errorw(fmt.Sprintf("Can't deliver to dead letter index event %#v (status=%v): %s", event, itemStatus, itemMessage), logp.TypeKey, logp.EventType)
stats.nonIndexable++
return false
}
if client.deadLetterIndex == "" {
// Fatal error and no dead letter index, drop.
client.log.Warnf("Cannot index event (status=%v): dropping event! Look at the event log to view the event and cause.", itemStatus)
client.pLogIndex.Add()
client.log.Warnw(fmt.Sprintf("Cannot index event %#v (status=%v): %s, dropping event!", event, itemStatus, itemMessage), logp.TypeKey, logp.EventType)
stats.nonIndexable++
return false
Expand All @@ -494,7 +522,7 @@ func (client *Client) applyItemStatus(
// We count this as a "retryable failure", and then if the dead letter
// ingestion succeeds it is counted in the "deadLetter" counter
// rather than the "acked" counter.
client.log.Warnf("Cannot index event (status=%v), trying dead letter index. Look at the event log to view the event and cause.", itemStatus)
client.pLogIndexTryDeadLetter.Add()
client.log.Warnw(fmt.Sprintf("Cannot index event %#v (status=%v): %s, trying dead letter index", event, itemStatus, itemMessage), logp.TypeKey, logp.EventType)
encodedEvent.setDeadLetter(client.deadLetterIndex, itemStatus, string(itemMessage))
}
Expand Down
Loading