Skip to content

Commit

Permalink
wip: rate limit ES client index error logs
Browse files Browse the repository at this point in the history
  • Loading branch information
AndersonQ committed Aug 7, 2024
1 parent 2060383 commit 67949a1
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 10 deletions.
1 change: 1 addition & 0 deletions dev-tools/notice/overrides.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{"name": "github.com/AndersonQ/elastic-agent-client/v7", "licenceType": "Elastic"}
{"name": "github.com/elastic/elastic-agent-client/v7", "licenceType": "Elastic"}
{"name": "github.com/elastic/elastic-agent-shipper-client", "licenceType": "Elastic"}
{"name": "github.com/gorhill/cronexpr", "licenceType": "Apache-2.0", "licenceFile":"APLv2"}
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -411,3 +411,5 @@ replace (
github.com/insomniacslk/dhcp => github.com/elastic/dhcp v0.0.0-20200227161230-57ec251c7eb3 // indirect
github.com/snowflakedb/gosnowflake => github.com/snowflakedb/gosnowflake v1.6.19
)

replace github.com/elastic/elastic-agent-libs => github.com/AndersonQ/elastic-agent-libs v0.0.0-20240807080033-80e14fea8635
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4/go.mod h1:hN
github.com/99designs/keyring v1.2.1/go.mod h1:fc+wB5KTk9wQ9sDx0kFXB3A0MaeGHM9AwRStKOQ5vOA=
github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 h1:bvDV9vkmnHYOMsOr4WLk+Vo07yKIzd94sVoIqshQ4bU=
github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24/go.mod h1:8o94RPi1/7XTJvwPpRSzSUedZrtlirdB3r9Z20bi2f8=
github.com/AndersonQ/elastic-agent-libs v0.0.0-20240807080033-80e14fea8635 h1:TYML1M8hFz+CMZV4rDlGJfPBEn7tli35xEP8wFCB+HM=
github.com/AndersonQ/elastic-agent-libs v0.0.0-20240807080033-80e14fea8635/go.mod h1:2VgYxHaeM+cCDBjiS2wbmTvzPGbnlXAtYrlcLefheS8=
github.com/Azure/azure-amqp-common-go/v4 v4.2.0 h1:q/jLx1KJ8xeI8XGfkOWMN9XrXzAfVTkyvCxPvHCjd2I=
github.com/Azure/azure-amqp-common-go/v4 v4.2.0/go.mod h1:GD3m/WPPma+621UaU6KNjKEo5Hl09z86viKwQjTpV0Q=
github.com/Azure/azure-event-hubs-go/v3 v3.6.1 h1:vSiMmn3tOwgiLyfnmhT5K6Of/3QWRLaaNZPI0hFvZyU=
Expand Down Expand Up @@ -556,8 +558,6 @@ github.com/elastic/elastic-agent-autodiscover v0.8.1 h1:u6TWqh7wfevu6S4GUq4SIxYB
github.com/elastic/elastic-agent-autodiscover v0.8.1/go.mod h1:0gzGsaDCAqBfUZjuCqqWsSI60eaZ778A5tQZV72rPV0=
github.com/elastic/elastic-agent-client/v7 v7.15.0 h1:nDB7v8TBoNuD6IIzC3z7Q0y+7bMgXoT2DsHfolO2CHE=
github.com/elastic/elastic-agent-client/v7 v7.15.0/go.mod h1:6h+f9QdIr3GO2ODC0Y8+aEXRwzbA5W4eV4dd/67z7nI=
github.com/elastic/elastic-agent-libs v0.9.15 h1:WCLtuErafUxczT/rXJa4Vr6mxwC8dgtqMbEq+qWGD4M=
github.com/elastic/elastic-agent-libs v0.9.15/go.mod h1:2VgYxHaeM+cCDBjiS2wbmTvzPGbnlXAtYrlcLefheS8=
github.com/elastic/elastic-agent-system-metrics v0.11.0 h1:/bWrgTsHZWLUhdT7WPNuQDFkrSfm+A4qf6QDQnZo9d8=
github.com/elastic/elastic-agent-system-metrics v0.11.0/go.mod h1:3QiMu9wTKJFvpCN+5klgGqasTMNKJbgY3xcoN1KQXJk=
github.com/elastic/elastic-transport-go/v8 v8.6.0 h1:Y2S/FBjx1LlCv5m6pWAF2kDJAHoSjSRSJCApolgfthA=
Expand Down
50 changes: 42 additions & 8 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ type Client struct {
// forwarded to this index. Otherwise, they will be dropped.
deadLetterIndex string

log *logp.Logger
log *logp.Logger
rlLogIndex *logp.RateLimitedLogger
rlLogIndexTryDeadLetter *logp.RateLimitedLogger
rlLogDeadLetter *logp.RateLimitedLogger
}

// clientSettings contains the settings for a client.
Expand Down Expand Up @@ -160,14 +163,48 @@ func NewClient(
observer = outputs.NewNilObserver()
}

log := logp.NewLogger("elasticsearch")

rtLogDeadLetter := logp.NewRateLimited(10*time.Second,
func(count uint64, d time.Duration) {
log.Errorf(
"Failed to deliver to dead letter index %d events in last %s. Look at the event log to view the event and cause.", count, d)
log.Errorw(fmt.Sprintf(
"iled to deliver to dead letter index %d events in last %s. Look at the event log to view the event and cause.", count, d),
logp.TypeKey, logp.EventType)
return

Check failure on line 175 in libbeat/outputs/elasticsearch/client.go

View workflow job for this annotation

GitHub Actions / lint (windows)

S1023: redundant `return` statement (gosimple)
})
rlLogIndex := logp.NewRateLimited(10*time.Second, func(count uint64, d time.Duration) {
log.Warnf(
"Failed to index %d events in last %s: events were dropped! Look at the event log to view the event and cause.",
count, d)
log.Warnw(fmt.Sprintf(
"Failed to index %d events in last %s: events were dropped", count, d),
logp.TypeKey, logp.EventType)
})
rlLogIndexTryDeadLetter := logp.NewRateLimited(10*time.Second, func(count uint64, d time.Duration) {
log.Warnf(
"Failed to index %d events in last %s: tried dead letter index. Look at the event log to view the event and cause.",
count, d)
log.Warnw(fmt.Sprintf(
"Failed to index %d events in last %s: tried dead letter index", count, d),
logp.TypeKey, logp.EventType)
})

rtLogDeadLetter.Start()
rlLogIndex.Start()
rlLogIndexTryDeadLetter.Start()
client := &Client{
conn: *conn,
indexSelector: s.indexSelector,
pipelineSelector: pipeline,
observer: observer,
deadLetterIndex: s.deadLetterIndex,

log: logp.NewLogger("elasticsearch"),
log: log,
rlLogDeadLetter: rtLogDeadLetter,
rlLogIndex: rlLogIndex,
rlLogIndexTryDeadLetter: rlLogIndexTryDeadLetter,
}

return client, nil
Expand Down Expand Up @@ -478,24 +515,21 @@ func (client *Client) applyItemStatus(
if encodedEvent.deadLetter {
// Fatal error while sending an already-failed event to the dead letter
// index, drop.
client.log.Errorf("Can't deliver to dead letter index event (status=%v). Look at the event log to view the event and cause.", itemStatus)
client.log.Errorw(fmt.Sprintf("Can't deliver to dead letter index event %#v (status=%v): %s", event, itemStatus, itemMessage), logp.TypeKey, logp.EventType)
client.rlLogDeadLetter.Add()
stats.nonIndexable++
return false
}
if client.deadLetterIndex == "" {
// Fatal error and no dead letter index, drop.
client.log.Warnf("Cannot index event (status=%v): dropping event! Look at the event log to view the event and cause.", itemStatus)
client.log.Warnw(fmt.Sprintf("Cannot index event %#v (status=%v): %s, dropping event!", event, itemStatus, itemMessage), logp.TypeKey, logp.EventType)
client.rlLogIndex.Add()
stats.nonIndexable++
return false
}
// Send this failure to the dead letter index and "retry".
// We count this as a "retryable failure", and then if the dead letter
// ingestion succeeds it is counted in the "deadLetter" counter
// rather than the "acked" counter.
client.log.Warnf("Cannot index event (status=%v), trying dead letter index. Look at the event log to view the event and cause.", itemStatus)
client.log.Warnw(fmt.Sprintf("Cannot index event %#v (status=%v): %s, trying dead letter index", event, itemStatus, itemMessage), logp.TypeKey, logp.EventType)
client.rlLogIndexTryDeadLetter.Add()
encodedEvent.setDeadLetter(client.deadLetterIndex, itemStatus, string(itemMessage))
}

Expand Down

0 comments on commit 67949a1

Please sign in to comment.