Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filebeat][CometD] Resolve Retry Error Handling #34327

Merged
merged 15 commits into from
Feb 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Fixing system tests not returning expected content encoding for azure blob storage input. {pull}34412[34412]
- [Azure Logs] Fix authentication_processing_details parsing in sign-in logs. {issue}34330[34330] {pull}34478[34478]
- Prevent Elasticsearch from spewing log warnings about redundant wildcard when setting up ingest pipelines. {issue}34249[34249] {pull}34550[34550]
- Fix the issue of `cometd` input worker getting closed in case of a network connection issue and an EOF error. {issue}34326[34326] {pull}34327[34327]

*Heartbeat*

Expand Down
96 changes: 73 additions & 23 deletions x-pack/filebeat/input/cometd/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@ package cometd
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"sync"
"time"

"golang.org/x/time/rate"

"github.com/elastic/beats/v7/filebeat/channel"
"github.com/elastic/beats/v7/filebeat/input"
"github.com/elastic/beats/v7/libbeat/beat"
Expand All @@ -23,6 +27,9 @@ import (

const (
inputName = "cometd"

// retryInterval is the minimum duration between pub/sub client retries.
retryInterval = 30 * time.Second
)

// Run starts the input worker then returns. Only the first invocation
Expand All @@ -37,44 +44,90 @@ func (in *cometdInput) Run() {
defer in.workerWg.Done()
defer in.workerCancel()
in.b = bay.Bayeux{}
in.creds, err = bay.GetSalesforceCredentials(in.authParams)
if err != nil {
in.log.Errorw("not able to get access token", "error", err)
return
}
if err := in.run(); err != nil {
in.log.Errorw("got error while running input", "error", err)
return

rt := rate.NewLimiter(rate.Every(retryInterval), 1)

for in.workerCtx.Err() == nil {
// Rate limit.
if err := rt.Wait(in.workerCtx); err != nil {
continue
}

// Creating a new channel for cometd input.
in.msgCh = make(chan bay.MaybeMsg, 1)

in.creds, err = bay.GetSalesforceCredentials(in.authParams)
if err != nil {
in.log.Errorw("not able to get access token", "error", err)
continue
}

if err := in.run(); err != nil {
if in.workerCtx.Err() == nil {
in.log.Errorw("Restarting failed CometD input worker.", "error", err)
continue
}

// Log any non-cancellation error before stopping.
if !errors.Is(err, context.Canceled) {
in.log.Errorw("got error while running input", "error", err)
}
}
}
}()
})
}

func (in *cometdInput) run() error {
in.msgCh = in.b.Channel(in.workerCtx, in.msgCh, "-1", *in.creds, in.config.ChannelName)
ctx, cancel := context.WithCancel(in.workerCtx)
defer cancel()
// Ticker with 5 seconds to avoid log too many warnings
ticker := time.NewTicker(5 * time.Second)
in.msgCh = in.b.Channel(ctx, in.msgCh, "-1", *in.creds, in.config.ChannelName)
for e := range in.msgCh {
if e.Failed() {
return fmt.Errorf("error collecting events: %w", e.Err)
// if err bayeux library returns recoverable error, do not close input.
// instead continue with connection warning
if !strings.Contains(e.Error(), "trying again") {
return fmt.Errorf("error collecting events: %w", e.Err)
}
// log warning every 5 seconds only to avoid to many unnecessary logs
select {
case <-ticker.C:
in.log.Errorw("Retrying...! facing issue while collecting data from CometD", "error", e.Error())
default:
}
} else if !e.Msg.Successful {
var event event
// To handle the last response where the object received was empty
if e.Msg.Data.Payload == nil {
return nil
}

var msg []byte
var err error
// Convert json.RawMessage response to []byte
msg, err := e.Msg.Data.Payload.MarshalJSON()
if err != nil {
return fmt.Errorf("JSON error: %w", err)
if e.Msg.Data.Payload != nil {
msg, err = e.Msg.Data.Payload.MarshalJSON()
if err != nil {
in.log.Errorw("invalid JSON", "error", err)
continue
}
} else if e.Msg.Data.Object != nil {
msg, err = e.Msg.Data.Object.MarshalJSON()
if err != nil {
in.log.Errorw("invalid JSON", "error", err)
continue
}
} else {
// To handle the last response where the object received was empty
return nil
}

// Extract event IDs from json.RawMessage
err = json.Unmarshal(e.Msg.Data.Payload, &event)
err = json.Unmarshal(msg, &event)
if err != nil {
return fmt.Errorf("error while parsing JSON: %w", err)
in.log.Errorw("error while parsing JSON", "error", err)
continue
}
if ok := in.outlet.OnEvent(makeEvent(event.EventId, e.Msg.Channel, string(msg))); !ok {
in.log.Debug("OnEvent returned false. Stopping input worker.")
cancel()
return fmt.Errorf("error ingesting data to elasticsearch")
}
}
Expand Down Expand Up @@ -133,9 +186,6 @@ func NewInput(
authParams: authParams,
}

// Creating a new channel for cometd input.
in.msgCh = make(chan bay.MaybeMsg, 1)

// Build outlet for events.
in.outlet, err = connector.Connect(cfg)
if err != nil {
Expand Down
Loading