diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 5b4bcd961e8..075c07f1f39 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -48,6 +48,7 @@ https://github.com/elastic/beats/compare/v5.0.1...master[Check the HEAD diff] - Fix registry cleanup issue when files falling under ignore_older after restart. {issue}2818[2818] *Winlogbeat* +- Fix for "The array bounds are invalid" error when reading large events. {issue}3076[3076] ==== Added diff --git a/winlogbeat/eventlog/eventlog.go b/winlogbeat/eventlog/eventlog.go index c888a04dece..2839cfb1b59 100644 --- a/winlogbeat/eventlog/eventlog.go +++ b/winlogbeat/eventlog/eventlog.go @@ -5,6 +5,7 @@ import ( "fmt" "reflect" "strconv" + "syscall" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" @@ -23,9 +24,14 @@ var ( detailf = logp.MakeDebug(detailSelector) ) -// dropReasons contains counters for the number of dropped events for each -// reason. -var dropReasons = expvar.NewMap("drop_reasons") +var ( + // dropReasons contains counters for the number of dropped events for each + // reason. + dropReasons = expvar.NewMap("drop_reasons") + + // readErrors contains counters for the read error types that occur. + readErrors = expvar.NewMap("read_errors") +) // EventLog is an interface to a Windows Event Log. type EventLog interface { @@ -177,3 +183,17 @@ func isZero(i interface{}) bool { } return false } + +// incrementMetric increments a value in the specified expvar.Map. The key +// should be a windows syscall.Errno or a string. Any other types will be +// reported under the "other" key. +func incrementMetric(v *expvar.Map, key interface{}) { + switch t := key.(type) { + default: + v.Add("other", 1) + case string: + v.Add(t, 1) + case syscall.Errno: + v.Add(strconv.Itoa(int(t)), 1) + } +} diff --git a/winlogbeat/eventlog/eventlogging.go b/winlogbeat/eventlog/eventlogging.go index e220c7c1266..ba77d7085fa 100644 --- a/winlogbeat/eventlog/eventlogging.go +++ b/winlogbeat/eventlog/eventlogging.go @@ -195,6 +195,7 @@ func (l *eventLogging) Close() error { // by attempting to correct the error through closing and reopening the event // log. func (l *eventLogging) readRetryErrorHandler(err error) error { + incrementMetric(readErrors, err) if errno, ok := err.(syscall.Errno); ok { var reopen bool diff --git a/winlogbeat/eventlog/eventlogging_test.go b/winlogbeat/eventlog/eventlogging_test.go index 5e2c6d9320b..98be3c28aa7 100644 --- a/winlogbeat/eventlog/eventlogging_test.go +++ b/winlogbeat/eventlog/eventlogging_test.go @@ -4,6 +4,8 @@ package eventlog import ( "fmt" + "os/exec" + "strconv" "strings" "sync" "testing" @@ -35,6 +37,8 @@ const ( const allLevels = elog.Success | elog.AuditFailure | elog.AuditSuccess | elog.Error | elog.Info | elog.Warning +const gigabyte = 1 << 30 + // Test messages. var messages = map[uint32]struct { eventType uint16 @@ -72,7 +76,7 @@ var oneTimeLogpInit sync.Once func configureLogp() { oneTimeLogpInit.Do(func() { if testing.Verbose() { - logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"eventlog", "eventlog_detail"}) + logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"eventlog"}) logp.Info("DEBUG enabled for eventlog.") } else { logp.LogInit(logp.LOG_WARNING, "", false, true, []string{}) @@ -143,6 +147,14 @@ func uninstallLog(provider, source string, log *elog.Log) error { return errs.Err() } +// setLogSize set the maximum number of bytes that an event log can hold. +func setLogSize(t testing.TB, provider string, sizeBytes int) { + output, err := exec.Command("wevtutil.exe", "sl", "/ms:"+strconv.Itoa(sizeBytes), providerName).CombinedOutput() + if err != nil { + t.Fatal("failed to set log size", err, string(output)) + } +} + // Verify that all messages are read from the event log. func TestRead(t *testing.T) { diff --git a/winlogbeat/eventlog/wineventlog.go b/winlogbeat/eventlog/wineventlog.go index 7340e2b0bec..7caab618bf1 100644 --- a/winlogbeat/eventlog/wineventlog.go +++ b/winlogbeat/eventlog/wineventlog.go @@ -4,7 +4,6 @@ package eventlog import ( "fmt" - "strconv" "syscall" "time" @@ -13,6 +12,7 @@ import ( "github.com/elastic/beats/winlogbeat/sys" win "github.com/elastic/beats/winlogbeat/sys/wineventlog" "github.com/joeshaw/multierror" + "github.com/pkg/errors" "golang.org/x/sys/windows" ) @@ -73,6 +73,7 @@ type winEventLog struct { channelName string // Name of the channel from which to read. subscription win.EvtHandle // Handle to the subscription. maxRead int // Maximum number returned in one Read. + lastRead uint64 // Record number of the last read event. render func(event win.EvtHandle) (string, error) // Function for rendering the event to XML. renderBuf []byte // Buffer used for rendering event. @@ -118,13 +119,8 @@ func (l *winEventLog) Open(recordNumber uint64) error { } func (l *winEventLog) Read() ([]Record, error) { - handles, err := win.EventHandles(l.subscription, l.maxRead) - if err == win.ERROR_NO_MORE_ITEMS { - detailf("%s No more events", l.logPrefix) - return nil, nil - } - if err != nil { - logp.Warn("%s EventHandles returned error %v", l.logPrefix, err) + handles, _, err := l.eventHandles(l.maxRead) + if err != nil || len(handles) == 0 { return nil, err } defer func() { @@ -145,17 +141,18 @@ func (l *winEventLog) Read() ([]Record, error) { } if err != nil && x == "" { logp.Err("%s Dropping event with rendering error. %v", l.logPrefix, err) - reportDrop(err) + incrementMetric(dropReasons, err) continue } r, err := l.buildRecordFromXML(x, err) if err != nil { logp.Err("%s Dropping event. %v", l.logPrefix, err) - reportDrop("unmarshal") + incrementMetric(dropReasons, err) continue } records = append(records, r) + l.lastRead = r.RecordID } debugf("%s Read() is returning %d records", l.logPrefix, len(records)) @@ -167,6 +164,34 @@ func (l *winEventLog) Close() error { return win.Close(l.subscription) } +func (l *winEventLog) eventHandles(maxRead int) ([]win.EvtHandle, int, error) { + handles, err := win.EventHandles(l.subscription, maxRead) + switch err { + case nil: + if l.maxRead > maxRead { + debugf("%s Recovered from RPC_S_INVALID_BOUND error (errno 1734) "+ + "by decreasing batch_read_size to %v", l.logPrefix, maxRead) + } + return handles, maxRead, nil + case win.ERROR_NO_MORE_ITEMS: + detailf("%s No more events", l.logPrefix) + return nil, maxRead, nil + case win.RPC_S_INVALID_BOUND: + incrementMetric(readErrors, err) + if err := l.Close(); err != nil { + return nil, 0, errors.Wrap(err, "failed to recover from RPC_S_INVALID_BOUND") + } + if err := l.Open(l.lastRead); err != nil { + return nil, 0, errors.Wrap(err, "failed to recover from RPC_S_INVALID_BOUND") + } + return l.eventHandles(maxRead / 2) + default: + incrementMetric(readErrors, err) + logp.Warn("%s EventHandles returned error %v", l.logPrefix, err) + return nil, 0, err + } +} + func (l *winEventLog) buildRecordFromXML(x string, recoveredErr error) (Record, error) { e, err := sys.UnmarshalEventXML([]byte(x)) if err != nil { @@ -204,20 +229,6 @@ func (l *winEventLog) buildRecordFromXML(x string, recoveredErr error) (Record, return r, nil } -// reportDrop reports a dropped event log record and the reason as an expvar -// metric. The reason should be a windows syscall.Errno or a string. Any other -// types will be reported under the "other" key. -func reportDrop(reason interface{}) { - switch t := reason.(type) { - default: - dropReasons.Add("other", 1) - case string: - dropReasons.Add(t, 1) - case syscall.Errno: - dropReasons.Add(strconv.Itoa(int(t)), 1) - } -} - // newWinEventLog creates and returns a new EventLog for reading event logs // using the Windows Event Log. func newWinEventLog(options map[string]interface{}) (EventLog, error) { @@ -283,7 +294,7 @@ func newWinEventLog(options map[string]interface{}) (EventLog, error) { } func init() { - // Register eventlogging API if it is available. + // Register wineventlog API if it is available. available, _ := win.IsAvailable() if available { Register(winEventLogAPIName, 0, newWinEventLog, win.Channels) diff --git a/winlogbeat/eventlog/wineventlog_test.go b/winlogbeat/eventlog/wineventlog_test.go index af74a1bdb51..68896477343 100644 --- a/winlogbeat/eventlog/wineventlog_test.go +++ b/winlogbeat/eventlog/wineventlog_test.go @@ -3,8 +3,11 @@ package eventlog import ( + "expvar" + "strconv" "testing" + elog "github.com/andrewkroh/sys/windows/svc/eventlog" "github.com/stretchr/testify/assert" ) @@ -52,3 +55,69 @@ func TestWinEventLogBatchReadSize(t *testing.T) { assert.Len(t, records, batchReadSize) } + +// TestReadLargeBatchSize tests reading from an event log using a large +// read_batch_size parameter. When combined with large messages this causes +// EvtNext (wineventlog.EventRecords) to fail with RPC_S_INVALID_BOUND error. +func TestReadLargeBatchSize(t *testing.T) { + configureLogp() + log, err := initLog(providerName, sourceName, eventCreateMsgFile) + if err != nil { + t.Fatal(err) + } + defer func() { + err := uninstallLog(providerName, sourceName, log) + if err != nil { + t.Fatal(err) + } + }() + + setLogSize(t, providerName, gigabyte) + + // Publish large test messages. + totalEvents := 1000 + for i := 0; i < totalEvents; i++ { + err = log.Report(elog.Info, uint32(i%1000), []string{strconv.Itoa(i) + " " + randString(31800)}) + if err != nil { + t.Fatal("ReportEvent error", err) + } + } + + eventlog, err := newWinEventLog(map[string]interface{}{"name": providerName, "batch_read_size": 1024}) + if err != nil { + t.Fatal(err) + } + err = eventlog.Open(0) + if err != nil { + t.Fatal(err) + } + defer func() { + err := eventlog.Close() + if err != nil { + t.Fatal(err) + } + }() + + var eventCount int + for eventCount < totalEvents { + records, err := eventlog.Read() + if err != nil { + t.Fatal("read error", err) + } + if len(records) == 0 { + t.Fatal("read returned 0 records") + } + eventCount += len(records) + } + + t.Logf("number of records returned: %v", eventCount) + + wineventlog := eventlog.(*winEventLog) + assert.Equal(t, 1024, wineventlog.maxRead) + + expvar.Do(func(kv expvar.KeyValue) { + if kv.Key == "read_errors" { + t.Log(kv) + } + }) +} diff --git a/winlogbeat/sys/wineventlog/syscall_windows.go b/winlogbeat/sys/wineventlog/syscall_windows.go index 3ee3f736dd4..a02d62ef0cd 100644 --- a/winlogbeat/sys/wineventlog/syscall_windows.go +++ b/winlogbeat/sys/wineventlog/syscall_windows.go @@ -13,6 +13,7 @@ const ( ERROR_INSUFFICIENT_BUFFER syscall.Errno = 122 ERROR_NO_MORE_ITEMS syscall.Errno = 259 ERROR_NONE_MAPPED syscall.Errno = 1332 + RPC_S_INVALID_BOUND syscall.Errno = 1734 ERROR_INVALID_OPERATION syscall.Errno = 4317 ERROR_EVT_MESSAGE_NOT_FOUND syscall.Errno = 15027 ERROR_EVT_MESSAGE_ID_NOT_FOUND syscall.Errno = 15028 diff --git a/winlogbeat/sys/wineventlog/wineventlog_windows.go b/winlogbeat/sys/wineventlog/wineventlog_windows.go index f4f38452428..138c0da5aa0 100644 --- a/winlogbeat/sys/wineventlog/wineventlog_windows.go +++ b/winlogbeat/sys/wineventlog/wineventlog_windows.go @@ -125,6 +125,10 @@ func Subscribe( // handles available to return. Close must be called on each returned EvtHandle // when finished with the handle. func EventHandles(subscription EvtHandle, maxHandles int) ([]EvtHandle, error) { + if maxHandles < 1 { + return nil, fmt.Errorf("maxHandles must be greater than 0") + } + eventHandles := make([]EvtHandle, maxHandles) var numRead uint32