Skip to content

Commit

Permalink
rate limit Elasticsearch client indexing error logs
Browse files Browse the repository at this point in the history
Use `periodic.Doer` to rate limit indexing error logs. This does not affects the logs sent to the event logger.
  • Loading branch information
AndersonQ committed Aug 9, 2024
1 parent 63fc18c commit 47acb51
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fix FQDN being lowercased when used as `host.hostname` {issue}39993[39993]
- Beats won't log start up information when running under the Elastic Agent {40390}40390[40390]
- Filebeat now needs `dup3`, `faccessat2`, `prctl` and `setrlimit` syscalls to run the journald input. If this input is not being used, the syscalls are not needed. All Beats have those syscalls allowed now because the default seccomp policy is global to all Beats. {pull}40061[40061]
- Beats will rate limit the logs about errors when indexing events on Elasticsearch, logging a summary every 10s. The logs sent to the event log is unchanged. {issue}40157[40157]

*Auditbeat*

Expand Down
6 changes: 3 additions & 3 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12999,12 +12999,12 @@ SOFTWARE


--------------------------------------------------------------------------------
Dependency : github.com/elastic/elastic-agent-libs
Version: v0.9.15
Dependency : github.com/AndersonQ/elastic-agent-libs
Version: v0.0.0-20240807080033-80e14fea8635
Licence type (autodetected): Apache-2.0
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-libs@v0.9.15/LICENSE:
Contents of probable licence file $GOMODCACHE/github.com/!anderson!q/elastic-agent-libs@v0.0.0-20240807080033-80e14fea8635/LICENSE:

Apache License
Version 2.0, January 2004
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ require (
github.com/elastic/bayeux v1.0.5
github.com/elastic/ebpfevents v0.6.0
github.com/elastic/elastic-agent-autodiscover v0.8.1
github.com/elastic/elastic-agent-libs v0.9.15
github.com/elastic/elastic-agent-libs v0.10.0
github.com/elastic/elastic-agent-system-metrics v0.11.0
github.com/elastic/go-elasticsearch/v8 v8.14.0
github.com/elastic/go-sfdc v0.0.0-20240621062639-bcc8456508ff
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -560,8 +560,8 @@ github.com/elastic/elastic-agent-autodiscover v0.8.1 h1:u6TWqh7wfevu6S4GUq4SIxYB
github.com/elastic/elastic-agent-autodiscover v0.8.1/go.mod h1:0gzGsaDCAqBfUZjuCqqWsSI60eaZ778A5tQZV72rPV0=
github.com/elastic/elastic-agent-client/v7 v7.15.0 h1:nDB7v8TBoNuD6IIzC3z7Q0y+7bMgXoT2DsHfolO2CHE=
github.com/elastic/elastic-agent-client/v7 v7.15.0/go.mod h1:6h+f9QdIr3GO2ODC0Y8+aEXRwzbA5W4eV4dd/67z7nI=
github.com/elastic/elastic-agent-libs v0.9.15 h1:WCLtuErafUxczT/rXJa4Vr6mxwC8dgtqMbEq+qWGD4M=
github.com/elastic/elastic-agent-libs v0.9.15/go.mod h1:2VgYxHaeM+cCDBjiS2wbmTvzPGbnlXAtYrlcLefheS8=
github.com/elastic/elastic-agent-libs v0.10.0 h1:W7uvay0UYdLPtauXGsMD8Xfoe4qtcVWQR4icBgf/26Q=
github.com/elastic/elastic-agent-libs v0.10.0/go.mod h1:2VgYxHaeM+cCDBjiS2wbmTvzPGbnlXAtYrlcLefheS8=
github.com/elastic/elastic-agent-system-metrics v0.11.0 h1:/bWrgTsHZWLUhdT7WPNuQDFkrSfm+A4qf6QDQnZo9d8=
github.com/elastic/elastic-agent-system-metrics v0.11.0/go.mod h1:3QiMu9wTKJFvpCN+5klgGqasTMNKJbgY3xcoN1KQXJk=
github.com/elastic/elastic-transport-go/v8 v8.6.0 h1:Y2S/FBjx1LlCv5m6pWAF2kDJAHoSjSRSJCApolgfthA=
Expand Down
40 changes: 34 additions & 6 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/periodic"
"github.com/elastic/elastic-agent-libs/testing"
"github.com/elastic/elastic-agent-libs/version"
)
Expand All @@ -58,7 +59,10 @@ type Client struct {
// forwarded to this index. Otherwise, they will be dropped.
deadLetterIndex string

log *logp.Logger
log *logp.Logger
rlLogIndex *periodic.Doer
rlLogIndexTryDeadLetter *periodic.Doer
rlLogDeadLetter *periodic.Doer
}

// clientSettings contains the settings for a client.
Expand Down Expand Up @@ -154,20 +158,44 @@ func NewClient(
return nil
}

// Make sure there's a non-nil obser
// Make sure there's a non-nil observer
observer := s.observer
if observer == nil {
observer = outputs.NewNilObserver()
}

log := logp.NewLogger("elasticsearch")

pLogDeadLetter := periodic.NewDoer(10*time.Second,
func(count uint64, d time.Duration) {
log.Errorf(
"Failed to deliver to dead letter index %d events in last %s. Look at the event log to view the event and cause.", count, d)
})
pLogIndex := periodic.NewDoer(10*time.Second, func(count uint64, d time.Duration) {
log.Warnf(
"Failed to index %d events in last %s: events were dropped! Look at the event log to view the event and cause.",
count, d)
})
pLogIndexTryDeadLetter := periodic.NewDoer(10*time.Second, func(count uint64, d time.Duration) {
log.Warnf(
"Failed to index %d events in last %s: tried dead letter index. Look at the event log to view the event and cause.",
count, d)
})

pLogDeadLetter.Start()
pLogIndex.Start()
pLogIndexTryDeadLetter.Start()
client := &Client{
conn: *conn,
indexSelector: s.indexSelector,
pipelineSelector: pipeline,
observer: observer,
deadLetterIndex: s.deadLetterIndex,

log: logp.NewLogger("elasticsearch"),
log: log,
rlLogDeadLetter: pLogDeadLetter,
rlLogIndex: pLogIndex,
rlLogIndexTryDeadLetter: pLogIndexTryDeadLetter,
}

return client, nil
Expand Down Expand Up @@ -478,14 +506,14 @@ func (client *Client) applyItemStatus(
if encodedEvent.deadLetter {
// Fatal error while sending an already-failed event to the dead letter
// index, drop.
client.log.Errorf("Can't deliver to dead letter index event (status=%v). Look at the event log to view the event and cause.", itemStatus)
client.rlLogDeadLetter.Add()
client.log.Errorw(fmt.Sprintf("Can't deliver to dead letter index event %#v (status=%v): %s", event, itemStatus, itemMessage), logp.TypeKey, logp.EventType)
stats.nonIndexable++
return false
}
if client.deadLetterIndex == "" {
// Fatal error and no dead letter index, drop.
client.log.Warnf("Cannot index event (status=%v): dropping event! Look at the event log to view the event and cause.", itemStatus)
client.rlLogIndex.Add()
client.log.Warnw(fmt.Sprintf("Cannot index event %#v (status=%v): %s, dropping event!", event, itemStatus, itemMessage), logp.TypeKey, logp.EventType)
stats.nonIndexable++
return false
Expand All @@ -494,7 +522,7 @@ func (client *Client) applyItemStatus(
// We count this as a "retryable failure", and then if the dead letter
// ingestion succeeds it is counted in the "deadLetter" counter
// rather than the "acked" counter.
client.log.Warnf("Cannot index event (status=%v), trying dead letter index. Look at the event log to view the event and cause.", itemStatus)
client.rlLogIndexTryDeadLetter.Add()
client.log.Warnw(fmt.Sprintf("Cannot index event %#v (status=%v): %s, trying dead letter index", event, itemStatus, itemMessage), logp.TypeKey, logp.EventType)
encodedEvent.setDeadLetter(client.deadLetterIndex, itemStatus, string(itemMessage))
}
Expand Down

0 comments on commit 47acb51

Please sign in to comment.