diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index c5872e91958..073cb27b426 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -15,6 +15,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Beats won't log start up information when running under the Elastic Agent {40390}40390[40390] - Elevate effective capability set to match the Permitted set for agentbeat {pull}40466[40466] - Filebeat now needs `dup3`, `faccessat2`, `prctl` and `setrlimit` syscalls to run the journald input. If this input is not being used, the syscalls are not needed. All Beats have those syscalls allowed now because the default seccomp policy is global to all Beats. {pull}40061[40061] +- Beats will rate limit the logs about errors when indexing events on Elasticsearch, logging a summary every 10s. The logs sent to the event log is unchanged. {issue}40157[40157] *Auditbeat* diff --git a/NOTICE.txt b/NOTICE.txt index f1ade24db6e..a86a98a6869 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -13000,11 +13000,11 @@ SOFTWARE -------------------------------------------------------------------------------- Dependency : github.com/elastic/elastic-agent-libs -Version: v0.9.15 +Version: v0.10.0 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-libs@v0.9.15/LICENSE: +Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-libs@v0.10.0/LICENSE: Apache License Version 2.0, January 2004 diff --git a/filebeat/tests/integration/event_log_file_test.go b/filebeat/tests/integration/event_log_file_test.go index 5b2758b4018..685f7807ccc 100644 --- a/filebeat/tests/integration/event_log_file_test.go +++ b/filebeat/tests/integration/event_log_file_test.go @@ -84,9 +84,10 @@ func TestEventsLoggerESOutput(t *testing.T) { _, _ = logFile.WriteString(` {"message":"foo bar","int":10,"string":"str"} +{"message":"index failure 1","int":"not a number","string":10} {"message":"another message","int":20,"string":"str2"} -{"message":"index failure","int":"not a number","string":10} -{"message":"second index failure","int":"not a number","string":10} +{"message":"index failure 2","int":"not a number","string":10} +{"message":"index failure 3","int":"not a number","string":10} `) if err := logFile.Sync(); err != nil { t.Fatalf("could not sync log file '%s': %s", logFilePath, err) @@ -99,7 +100,7 @@ func TestEventsLoggerESOutput(t *testing.T) { // Wait for a log entry that indicates an entry in the events // logger file. - msg := "Cannot index event (status=400)" + msg := "Failed to index 3 events in last" require.Eventually(t, func() bool { return filebeat.LogContains(msg) }, time.Minute, 100*time.Millisecond, diff --git a/go.mod b/go.mod index 5a8246836d6..8326ca91cf1 100644 --- a/go.mod +++ b/go.mod @@ -193,7 +193,7 @@ require ( github.com/elastic/bayeux v1.0.5 github.com/elastic/ebpfevents v0.6.0 github.com/elastic/elastic-agent-autodiscover v0.8.1 - github.com/elastic/elastic-agent-libs v0.9.15 + github.com/elastic/elastic-agent-libs v0.10.0 github.com/elastic/elastic-agent-system-metrics v0.11.1 github.com/elastic/go-elasticsearch/v8 v8.14.0 github.com/elastic/go-sfdc v0.0.0-20240621062639-bcc8456508ff diff --git a/go.sum b/go.sum index ad756bd377f..747473783f8 100644 --- a/go.sum +++ b/go.sum @@ -560,8 +560,8 @@ github.com/elastic/elastic-agent-autodiscover v0.8.1 h1:u6TWqh7wfevu6S4GUq4SIxYB github.com/elastic/elastic-agent-autodiscover v0.8.1/go.mod h1:0gzGsaDCAqBfUZjuCqqWsSI60eaZ778A5tQZV72rPV0= github.com/elastic/elastic-agent-client/v7 v7.15.0 h1:nDB7v8TBoNuD6IIzC3z7Q0y+7bMgXoT2DsHfolO2CHE= github.com/elastic/elastic-agent-client/v7 v7.15.0/go.mod h1:6h+f9QdIr3GO2ODC0Y8+aEXRwzbA5W4eV4dd/67z7nI= -github.com/elastic/elastic-agent-libs v0.9.15 h1:WCLtuErafUxczT/rXJa4Vr6mxwC8dgtqMbEq+qWGD4M= -github.com/elastic/elastic-agent-libs v0.9.15/go.mod h1:2VgYxHaeM+cCDBjiS2wbmTvzPGbnlXAtYrlcLefheS8= +github.com/elastic/elastic-agent-libs v0.10.0 h1:W7uvay0UYdLPtauXGsMD8Xfoe4qtcVWQR4icBgf/26Q= +github.com/elastic/elastic-agent-libs v0.10.0/go.mod h1:2VgYxHaeM+cCDBjiS2wbmTvzPGbnlXAtYrlcLefheS8= github.com/elastic/elastic-agent-system-metrics v0.11.1 h1:BxViQHnqxvvi/65rj3mGwG6Eto6ldFCTnuDTUJnakaU= github.com/elastic/elastic-agent-system-metrics v0.11.1/go.mod h1:3QiMu9wTKJFvpCN+5klgGqasTMNKJbgY3xcoN1KQXJk= github.com/elastic/elastic-transport-go/v8 v8.6.0 h1:Y2S/FBjx1LlCv5m6pWAF2kDJAHoSjSRSJCApolgfthA= diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 933d04c789c..91084141652 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -35,6 +35,7 @@ import ( "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/elastic-agent-libs/periodic" "github.com/elastic/elastic-agent-libs/testing" "github.com/elastic/elastic-agent-libs/version" ) @@ -58,7 +59,10 @@ type Client struct { // forwarded to this index. Otherwise, they will be dropped. deadLetterIndex string - log *logp.Logger + log *logp.Logger + pLogIndex *periodic.Doer + pLogIndexTryDeadLetter *periodic.Doer + pLogDeadLetter *periodic.Doer } // clientSettings contains the settings for a client. @@ -154,12 +158,33 @@ func NewClient( return nil } - // Make sure there's a non-nil obser + // Make sure there's a non-nil observer observer := s.observer if observer == nil { observer = outputs.NewNilObserver() } + log := logp.NewLogger("elasticsearch") + + pLogDeadLetter := periodic.NewDoer(10*time.Second, + func(count uint64, d time.Duration) { + log.Errorf( + "Failed to deliver to dead letter index %d events in last %s. Look at the event log to view the event and cause.", count, d) + }) + pLogIndex := periodic.NewDoer(10*time.Second, func(count uint64, d time.Duration) { + log.Warnf( + "Failed to index %d events in last %s: events were dropped! Look at the event log to view the event and cause.", + count, d) + }) + pLogIndexTryDeadLetter := periodic.NewDoer(10*time.Second, func(count uint64, d time.Duration) { + log.Warnf( + "Failed to index %d events in last %s: tried dead letter index. Look at the event log to view the event and cause.", + count, d) + }) + + pLogDeadLetter.Start() + pLogIndex.Start() + pLogIndexTryDeadLetter.Start() client := &Client{ conn: *conn, indexSelector: s.indexSelector, @@ -167,7 +192,10 @@ func NewClient( observer: observer, deadLetterIndex: s.deadLetterIndex, - log: logp.NewLogger("elasticsearch"), + log: log, + pLogDeadLetter: pLogDeadLetter, + pLogIndex: pLogIndex, + pLogIndexTryDeadLetter: pLogIndexTryDeadLetter, } return client, nil @@ -478,14 +506,14 @@ func (client *Client) applyItemStatus( if encodedEvent.deadLetter { // Fatal error while sending an already-failed event to the dead letter // index, drop. - client.log.Errorf("Can't deliver to dead letter index event (status=%v). Look at the event log to view the event and cause.", itemStatus) + client.pLogDeadLetter.Add() client.log.Errorw(fmt.Sprintf("Can't deliver to dead letter index event %#v (status=%v): %s", event, itemStatus, itemMessage), logp.TypeKey, logp.EventType) stats.nonIndexable++ return false } if client.deadLetterIndex == "" { // Fatal error and no dead letter index, drop. - client.log.Warnf("Cannot index event (status=%v): dropping event! Look at the event log to view the event and cause.", itemStatus) + client.pLogIndex.Add() client.log.Warnw(fmt.Sprintf("Cannot index event %#v (status=%v): %s, dropping event!", event, itemStatus, itemMessage), logp.TypeKey, logp.EventType) stats.nonIndexable++ return false @@ -494,7 +522,7 @@ func (client *Client) applyItemStatus( // We count this as a "retryable failure", and then if the dead letter // ingestion succeeds it is counted in the "deadLetter" counter // rather than the "acked" counter. - client.log.Warnf("Cannot index event (status=%v), trying dead letter index. Look at the event log to view the event and cause.", itemStatus) + client.pLogIndexTryDeadLetter.Add() client.log.Warnw(fmt.Sprintf("Cannot index event %#v (status=%v): %s, trying dead letter index", event, itemStatus, itemMessage), logp.TypeKey, logp.EventType) encodedEvent.setDeadLetter(client.deadLetterIndex, itemStatus, string(itemMessage)) }