From 67949a1d366a5c9b4e885ae05c1b34c618447e00 Mon Sep 17 00:00:00 2001 From: Anderson Queiroz Date: Wed, 7 Aug 2024 10:23:49 +0200 Subject: [PATCH] wip: rate limit ES client index error logs --- dev-tools/notice/overrides.json | 1 + go.mod | 2 + go.sum | 4 +- libbeat/outputs/elasticsearch/client.go | 50 +++++++++++++++++++++---- 4 files changed, 47 insertions(+), 10 deletions(-) diff --git a/dev-tools/notice/overrides.json b/dev-tools/notice/overrides.json index eee18acc0de5..93133d46398f 100644 --- a/dev-tools/notice/overrides.json +++ b/dev-tools/notice/overrides.json @@ -1,3 +1,4 @@ +{"name": "github.com/AndersonQ/elastic-agent-client/v7", "licenceType": "Elastic"} {"name": "github.com/elastic/elastic-agent-client/v7", "licenceType": "Elastic"} {"name": "github.com/elastic/elastic-agent-shipper-client", "licenceType": "Elastic"} {"name": "github.com/gorhill/cronexpr", "licenceType": "Apache-2.0", "licenceFile":"APLv2"} diff --git a/go.mod b/go.mod index 49a19fc29fd7..ce2dcfb60ad3 100644 --- a/go.mod +++ b/go.mod @@ -411,3 +411,5 @@ replace ( github.com/insomniacslk/dhcp => github.com/elastic/dhcp v0.0.0-20200227161230-57ec251c7eb3 // indirect github.com/snowflakedb/gosnowflake => github.com/snowflakedb/gosnowflake v1.6.19 ) + +replace github.com/elastic/elastic-agent-libs => github.com/AndersonQ/elastic-agent-libs v0.0.0-20240807080033-80e14fea8635 diff --git a/go.sum b/go.sum index 96eb13a36d23..df3835a7b7fb 100644 --- a/go.sum +++ b/go.sum @@ -86,6 +86,8 @@ github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4/go.mod h1:hN github.com/99designs/keyring v1.2.1/go.mod h1:fc+wB5KTk9wQ9sDx0kFXB3A0MaeGHM9AwRStKOQ5vOA= github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 h1:bvDV9vkmnHYOMsOr4WLk+Vo07yKIzd94sVoIqshQ4bU= github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24/go.mod h1:8o94RPi1/7XTJvwPpRSzSUedZrtlirdB3r9Z20bi2f8= +github.com/AndersonQ/elastic-agent-libs v0.0.0-20240807080033-80e14fea8635 h1:TYML1M8hFz+CMZV4rDlGJfPBEn7tli35xEP8wFCB+HM= +github.com/AndersonQ/elastic-agent-libs v0.0.0-20240807080033-80e14fea8635/go.mod h1:2VgYxHaeM+cCDBjiS2wbmTvzPGbnlXAtYrlcLefheS8= github.com/Azure/azure-amqp-common-go/v4 v4.2.0 h1:q/jLx1KJ8xeI8XGfkOWMN9XrXzAfVTkyvCxPvHCjd2I= github.com/Azure/azure-amqp-common-go/v4 v4.2.0/go.mod h1:GD3m/WPPma+621UaU6KNjKEo5Hl09z86viKwQjTpV0Q= github.com/Azure/azure-event-hubs-go/v3 v3.6.1 h1:vSiMmn3tOwgiLyfnmhT5K6Of/3QWRLaaNZPI0hFvZyU= @@ -556,8 +558,6 @@ github.com/elastic/elastic-agent-autodiscover v0.8.1 h1:u6TWqh7wfevu6S4GUq4SIxYB github.com/elastic/elastic-agent-autodiscover v0.8.1/go.mod h1:0gzGsaDCAqBfUZjuCqqWsSI60eaZ778A5tQZV72rPV0= github.com/elastic/elastic-agent-client/v7 v7.15.0 h1:nDB7v8TBoNuD6IIzC3z7Q0y+7bMgXoT2DsHfolO2CHE= github.com/elastic/elastic-agent-client/v7 v7.15.0/go.mod h1:6h+f9QdIr3GO2ODC0Y8+aEXRwzbA5W4eV4dd/67z7nI= -github.com/elastic/elastic-agent-libs v0.9.15 h1:WCLtuErafUxczT/rXJa4Vr6mxwC8dgtqMbEq+qWGD4M= -github.com/elastic/elastic-agent-libs v0.9.15/go.mod h1:2VgYxHaeM+cCDBjiS2wbmTvzPGbnlXAtYrlcLefheS8= github.com/elastic/elastic-agent-system-metrics v0.11.0 h1:/bWrgTsHZWLUhdT7WPNuQDFkrSfm+A4qf6QDQnZo9d8= github.com/elastic/elastic-agent-system-metrics v0.11.0/go.mod h1:3QiMu9wTKJFvpCN+5klgGqasTMNKJbgY3xcoN1KQXJk= github.com/elastic/elastic-transport-go/v8 v8.6.0 h1:Y2S/FBjx1LlCv5m6pWAF2kDJAHoSjSRSJCApolgfthA= diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 933d04c789ca..665f88aba7ba 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -58,7 +58,10 @@ type Client struct { // forwarded to this index. Otherwise, they will be dropped. deadLetterIndex string - log *logp.Logger + log *logp.Logger + rlLogIndex *logp.RateLimitedLogger + rlLogIndexTryDeadLetter *logp.RateLimitedLogger + rlLogDeadLetter *logp.RateLimitedLogger } // clientSettings contains the settings for a client. @@ -160,6 +163,37 @@ func NewClient( observer = outputs.NewNilObserver() } + log := logp.NewLogger("elasticsearch") + + rtLogDeadLetter := logp.NewRateLimited(10*time.Second, + func(count uint64, d time.Duration) { + log.Errorf( + "Failed to deliver to dead letter index %d events in last %s. Look at the event log to view the event and cause.", count, d) + log.Errorw(fmt.Sprintf( + "iled to deliver to dead letter index %d events in last %s. Look at the event log to view the event and cause.", count, d), + logp.TypeKey, logp.EventType) + return + }) + rlLogIndex := logp.NewRateLimited(10*time.Second, func(count uint64, d time.Duration) { + log.Warnf( + "Failed to index %d events in last %s: events were dropped! Look at the event log to view the event and cause.", + count, d) + log.Warnw(fmt.Sprintf( + "Failed to index %d events in last %s: events were dropped", count, d), + logp.TypeKey, logp.EventType) + }) + rlLogIndexTryDeadLetter := logp.NewRateLimited(10*time.Second, func(count uint64, d time.Duration) { + log.Warnf( + "Failed to index %d events in last %s: tried dead letter index. Look at the event log to view the event and cause.", + count, d) + log.Warnw(fmt.Sprintf( + "Failed to index %d events in last %s: tried dead letter index", count, d), + logp.TypeKey, logp.EventType) + }) + + rtLogDeadLetter.Start() + rlLogIndex.Start() + rlLogIndexTryDeadLetter.Start() client := &Client{ conn: *conn, indexSelector: s.indexSelector, @@ -167,7 +201,10 @@ func NewClient( observer: observer, deadLetterIndex: s.deadLetterIndex, - log: logp.NewLogger("elasticsearch"), + log: log, + rlLogDeadLetter: rtLogDeadLetter, + rlLogIndex: rlLogIndex, + rlLogIndexTryDeadLetter: rlLogIndexTryDeadLetter, } return client, nil @@ -478,15 +515,13 @@ func (client *Client) applyItemStatus( if encodedEvent.deadLetter { // Fatal error while sending an already-failed event to the dead letter // index, drop. - client.log.Errorf("Can't deliver to dead letter index event (status=%v). Look at the event log to view the event and cause.", itemStatus) - client.log.Errorw(fmt.Sprintf("Can't deliver to dead letter index event %#v (status=%v): %s", event, itemStatus, itemMessage), logp.TypeKey, logp.EventType) + client.rlLogDeadLetter.Add() stats.nonIndexable++ return false } if client.deadLetterIndex == "" { // Fatal error and no dead letter index, drop. - client.log.Warnf("Cannot index event (status=%v): dropping event! Look at the event log to view the event and cause.", itemStatus) - client.log.Warnw(fmt.Sprintf("Cannot index event %#v (status=%v): %s, dropping event!", event, itemStatus, itemMessage), logp.TypeKey, logp.EventType) + client.rlLogIndex.Add() stats.nonIndexable++ return false } @@ -494,8 +529,7 @@ func (client *Client) applyItemStatus( // We count this as a "retryable failure", and then if the dead letter // ingestion succeeds it is counted in the "deadLetter" counter // rather than the "acked" counter. - client.log.Warnf("Cannot index event (status=%v), trying dead letter index. Look at the event log to view the event and cause.", itemStatus) - client.log.Warnw(fmt.Sprintf("Cannot index event %#v (status=%v): %s, trying dead letter index", event, itemStatus, itemMessage), logp.TypeKey, logp.EventType) + client.rlLogIndexTryDeadLetter.Add() encodedEvent.setDeadLetter(client.deadLetterIndex, itemStatus, string(itemMessage)) }