diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 3287a4853a9..f0e3a951133 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -173,6 +173,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - [Autodiscover] Handle input-not-finished errors in config reload. {pull}20915[20915] - Explicitly detect missing variables in autodiscover configuration, log them at the debug level. {issue}20568[20568] {pull}20898[20898] - Fix `libbeat.output.write.bytes` and `libbeat.output.read.bytes` metrics of the Elasticsearch output. {issue}20752[20752] {pull}21197[21197] +- The `o365input` and `o365` module now recover from an authentication problem or other fatal errors, instead of terminating. {pull}21259[21258] *Auditbeat* diff --git a/x-pack/filebeat/input/o365audit/input.go b/x-pack/filebeat/input/o365audit/input.go index ceff10751de..67013dc6a71 100644 --- a/x-pack/filebeat/input/o365audit/input.go +++ b/x-pack/filebeat/input/o365audit/input.go @@ -26,6 +26,9 @@ import ( const ( pluginName = "o365audit" fieldsPrefix = pluginName + + // How long to retry when a fatal error is encountered in the input. + failureRetryInterval = time.Minute * 5 ) type o365input struct { @@ -107,6 +110,34 @@ func (inp *o365input) Run( src cursor.Source, cursor cursor.Cursor, publisher cursor.Publisher, +) error { + for ctx.Cancelation.Err() == nil { + err := inp.runOnce(ctx, src, cursor, publisher) + if err == nil { + break + } + if ctx.Cancelation.Err() != err && err != context.Canceled { + msg := common.MapStr{} + msg.Put("error.message", err.Error()) + msg.Put("event.kind", "pipeline_error") + event := beat.Event{ + Timestamp: time.Now(), + Fields: msg, + } + publisher.Publish(event, nil) + ctx.Logger.Errorf("Input failed: %v", err) + ctx.Logger.Infof("Restarting in %v", failureRetryInterval) + time.Sleep(failureRetryInterval) + } + } + return nil +} + +func (inp *o365input) runOnce( + ctx v2.Context, + src cursor.Source, + cursor cursor.Cursor, + publisher cursor.Publisher, ) error { stream := src.(*stream) tenantID, contentType := stream.tenantID, stream.contentType @@ -156,18 +187,7 @@ func (inp *o365input) Run( } log.Infow("Start fetching events", "cursor", start) - err = poller.Run(action) - if err != nil && ctx.Cancelation.Err() != err && err != context.Canceled { - msg := common.MapStr{} - msg.Put("error.message", err.Error()) - msg.Put("event.kind", "pipeline_error") - event := beat.Event{ - Timestamp: time.Now(), - Fields: msg, - } - publisher.Publish(event, nil) - } - return err + return poller.Run(action) } func initCheckpoint(log *logp.Logger, c cursor.Cursor, maxRetention time.Duration) checkpoint {