From 5e4e7e5d772525363fd87d57cc777871831caf5f Mon Sep 17 00:00:00 2001 From: VihasMakwana <121151420+VihasMakwana@users.noreply.github.com> Date: Thu, 11 Jul 2024 12:14:58 +0530 Subject: [PATCH] [filebeat][winlog] implement status reporter for winlog input (#40163) * chore: implement status reporter for winlog * fix: remove test file, add changelog * chore: update changelog, add status reporting to swtich * fix: lint * chore: reuse same errors * fix the error --- CHANGELOG.next.asciidoc | 1 + filebeat/input/winlog/input.go | 25 ++++++++++++++++++++----- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 01af73726d4..2e791772ba8 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -45,6 +45,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Tag events that come from a filestream in "take over" mode. {pull}39828[39828] - Fix high IO and handling of a corrupted registry log file. {pull}35893[35893] - Filebeat, when running with Elastic-Agent, reports status for Filestream input. {pull}40121[40121] +- Implement Elastic Agent status and health reporting for Winlog Filebeat input. {pull}40163[40163] *Heartbeat* diff --git a/filebeat/input/winlog/input.go b/filebeat/input/winlog/input.go index ab925cbdd3c..945dd0e3476 100644 --- a/filebeat/input/winlog/input.go +++ b/filebeat/input/winlog/input.go @@ -26,6 +26,7 @@ import ( input "github.com/elastic/beats/v7/filebeat/input/v2" cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/go-concert/ctxtool" "github.com/elastic/go-concert/timed" @@ -39,6 +40,10 @@ type eventlogRunner struct{} const pluginName = "winlog" +const channelNotFoundError = "Encountered channel not found error when opening Windows Event Log" +const eventLogReadingError = "Error occurred while reading from Windows Event Log" +const resetError = "Error resetting Windows Event Log handle" + // Plugin create a stateful input Plugin collecting logs from Windows Event Logs. func Plugin(log *logp.Logger, store cursor.StateStore) input.Plugin { return input.Plugin{ @@ -99,6 +104,7 @@ func (eventlogRunner) Run( // Flag used to detect repeat "channel not found" errors, eliminating log spam. channelNotFoundErrDetected := false + ctx.UpdateStatus(status.Running, "") runLoop: for { @@ -109,6 +115,9 @@ runLoop: evtCheckpoint := initCheckpoint(log, cursor) openErr := api.Open(evtCheckpoint) + // Mark the input running. + // Status will be changed to "Degraded" if any error are encountered during opening/reading + ctx.UpdateStatus(status.Running, "") switch { case eventlog.IsRecoverable(openErr): @@ -117,14 +126,16 @@ runLoop: continue case !api.IsFile() && eventlog.IsChannelNotFound(openErr): if !channelNotFoundErrDetected { - log.Errorw("Encountered channel not found error when opening Windows Event Log", "error", openErr) + log.Errorw(channelNotFoundError, "error", openErr) } else { - log.Debugw("Encountered channel not found error when opening Windows Event Log", "error", openErr) + log.Debugw(channelNotFoundError, "error", openErr) } + ctx.UpdateStatus(status.Degraded, fmt.Sprintf("%s: %v", channelNotFoundError, openErr)) channelNotFoundErrDetected = true _ = timed.Wait(cancelCtx, 5*time.Second) continue case openErr != nil: + ctx.UpdateStatus(status.Degraded, fmt.Sprintf("failed to open Windows Event Log channel %q: %v", api.Channel(), openErr)) return fmt.Errorf("failed to open Windows Event Log channel %q: %w", api.Channel(), openErr) } channelNotFoundErrDetected = false @@ -137,14 +148,16 @@ runLoop: if eventlog.IsRecoverable(err) { log.Errorw("Encountered recoverable error when reading from Windows Event Log", "error", err) if resetErr := api.Reset(); resetErr != nil { - log.Errorw("Error resetting Windows Event Log handle", "error", resetErr) + log.Errorw(resetError, "error", resetErr) + ctx.UpdateStatus(status.Degraded, fmt.Sprintf("%s: %v", resetError, resetErr)) } continue runLoop } if !api.IsFile() && eventlog.IsChannelNotFound(err) { log.Errorw("Encountered channel not found error when reading from Windows Event Log", "error", err) if resetErr := api.Reset(); resetErr != nil { - log.Errorw("Error resetting Windows Event Log handle", "error", resetErr) + log.Errorw(resetError, "error", resetErr) + ctx.UpdateStatus(status.Degraded, fmt.Sprintf("%s: %v", resetError, resetErr)) } continue runLoop } @@ -160,7 +173,8 @@ runLoop: return nil } - log.Errorw("Error occurred while reading from Windows Event Log", "error", err) + log.Errorw(eventLogReadingError, "error", err) + ctx.UpdateStatus(status.Degraded, fmt.Sprintf("%s: %v", eventLogReadingError, err)) return err } if len(records) == 0 { @@ -173,6 +187,7 @@ runLoop: if err := publisher.Publish(event, record.Offset); err != nil { // Publisher indicates disconnect when returning an error. // stop trying to publish records and quit + ctx.UpdateStatus(status.Degraded, fmt.Sprintf("Error occurred while publishing from winlog: %v", err)) return err } }