Skip to content

Commit

Permalink
Backport: Fix for errno 1734 when calling EvtNext (elastic#3112) (ela…
Browse files Browse the repository at this point in the history
…stic#3124)

* Fix for errno 1734 when calling EvtNext

When reading a batch of large event log records the Windows function
EvtNext returns errno 1734 (0x6C6) which is RPC_S_INVALID_BOUND ("The
array bounds are invalid."). This seems to be a bug in Windows because
there is no documentation about this behavior.

This fix handles the error by resetting the event log subscription
handle (so events are not lost) and then retries the EvtNext call
with maxHandles/2.

Fixes elastic#3076

(cherry picked from commit 226eb10)

* Add benchmark test for batch_read_size in Winlogbeat
  • Loading branch information
andrewkroh authored and tsg committed Dec 6, 2016
1 parent 1db70fa commit d06e42c
Show file tree
Hide file tree
Showing 9 changed files with 285 additions and 29 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ https://github.com/elastic/beats/compare/v5.1.1...5.1[Check the HEAD diff]
*Filebeat*

*Winlogbeat*
- Fix for "The array bounds are invalid" error when reading large events. {issue}3076[3076]

////////////////////////////////////////////////////////////
Expand Down
137 changes: 137 additions & 0 deletions winlogbeat/eventlog/bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// +build windows

package eventlog

import (
"flag"
"math/rand"
"os/exec"
"strconv"
"testing"
"time"

elog "github.com/andrewkroh/sys/windows/svc/eventlog"
"github.com/dustin/go-humanize"
)

// Benchmark tests with customized output. (`go test -v -benchtime 10s -benchtest .`)

var (
benchTest = flag.Bool("benchtest", false, "Run benchmarks for the eventlog package")
injectAmount = flag.Int("inject", 50000, "Number of events to inject before running benchmarks")
)

// TestBenchmarkBatchReadSize tests the performance of different
// batch_read_size values.
func TestBenchmarkBatchReadSize(t *testing.T) {
if !*benchTest {
t.Skip("-benchtest not enabled")
}

log, err := initLog(providerName, sourceName, eventCreateMsgFile)
if err != nil {
t.Fatal(err)
}
defer func() {
err := uninstallLog(providerName, sourceName, log)
if err != nil {
t.Fatal(err)
}
}()

// Increase the log size so that it can hold these large events.
output, err := exec.Command("wevtutil.exe", "sl", "/ms:1073741824", providerName).CombinedOutput()
if err != nil {
t.Fatal(err, string(output))
}

// Publish test messages:
for i := 0; i < *injectAmount; i++ {
err = log.Report(elog.Info, uint32(rng.Int63()%1000), []string{strconv.Itoa(i) + " " + randString(256)})
if err != nil {
t.Fatal("ReportEvent error", err)
}
}

setup := func(t testing.TB, batchReadSize int) (EventLog, func()) {
eventlog, err := newWinEventLog(map[string]interface{}{"name": providerName, "batch_read_size": batchReadSize})
if err != nil {
t.Fatal(err)
}
err = eventlog.Open(0)
if err != nil {
t.Fatal(err)
}
return eventlog, func() {
err := eventlog.Close()
if err != nil {
t.Fatal(err)
}
}
}

benchTest := func(batchSize int) {
var err error
result := testing.Benchmark(func(b *testing.B) {
eventlog, tearDown := setup(b, batchSize)
defer tearDown()
b.ResetTimer()

// Each iteration reads one batch.
for i := 0; i < b.N; i++ {
_, err = eventlog.Read()
if err != nil {
return
}
}
})

if err != nil {
t.Fatal(err)
return
}

t.Logf("batch_size=%v, total_events=%v, batch_time=%v, events_per_sec=%v, bytes_alloced_per_event=%v, total_allocs=%v",
batchSize,
result.N*batchSize,
time.Duration(result.NsPerOp()),
float64(batchSize)/time.Duration(result.NsPerOp()).Seconds(),
humanize.Bytes(result.MemBytes/(uint64(result.N)*uint64(batchSize))),
result.MemAllocs)
}

benchTest(10)
benchTest(100)
benchTest(500)
benchTest(1000)
}

// Utility Functions

var rng = rand.NewSource(time.Now().UnixNano())

const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
const (
letterIdxBits = 6 // 6 bits to represent a letter index
letterIdxMask = 1<<letterIdxBits - 1 // All 1-bits, as many as letterIdxBits
letterIdxMax = 63 / letterIdxBits // # of letter indices fitting in 63 bits
)

// https://stackoverflow.com/questions/22892120/how-to-generate-a-random-string-of-a-fixed-length-in-golang
func randString(n int) string {
b := make([]byte, n)
// A src.Int63() generates 63 random bits, enough for letterIdxMax characters!
for i, cache, remain := n-1, rng.Int63(), letterIdxMax; i >= 0; {
if remain == 0 {
cache, remain = rng.Int63(), letterIdxMax
}
if idx := int(cache & letterIdxMask); idx < len(letterBytes) {
b[i] = letterBytes[idx]
i--
}
cache >>= letterIdxBits
remain--
}

return string(b)
}
26 changes: 23 additions & 3 deletions winlogbeat/eventlog/eventlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"reflect"
"strconv"
"syscall"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
Expand All @@ -23,9 +24,14 @@ var (
detailf = logp.MakeDebug(detailSelector)
)

// dropReasons contains counters for the number of dropped events for each
// reason.
var dropReasons = expvar.NewMap("drop_reasons")
var (
// dropReasons contains counters for the number of dropped events for each
// reason.
dropReasons = expvar.NewMap("drop_reasons")

// readErrors contains counters for the read error types that occur.
readErrors = expvar.NewMap("read_errors")
)

// EventLog is an interface to a Windows Event Log.
type EventLog interface {
Expand Down Expand Up @@ -177,3 +183,17 @@ func isZero(i interface{}) bool {
}
return false
}

// incrementMetric increments a value in the specified expvar.Map. The key
// should be a windows syscall.Errno or a string. Any other types will be
// reported under the "other" key.
func incrementMetric(v *expvar.Map, key interface{}) {
switch t := key.(type) {
default:
v.Add("other", 1)
case string:
v.Add(t, 1)
case syscall.Errno:
v.Add(strconv.Itoa(int(t)), 1)
}
}
1 change: 1 addition & 0 deletions winlogbeat/eventlog/eventlogging.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ func (l *eventLogging) Close() error {
// by attempting to correct the error through closing and reopening the event
// log.
func (l *eventLogging) readRetryErrorHandler(err error) error {
incrementMetric(readErrors, err)
if errno, ok := err.(syscall.Errno); ok {
var reopen bool

Expand Down
14 changes: 13 additions & 1 deletion winlogbeat/eventlog/eventlogging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ package eventlog

import (
"fmt"
"os/exec"
"strconv"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -35,6 +37,8 @@ const (

const allLevels = elog.Success | elog.AuditFailure | elog.AuditSuccess | elog.Error | elog.Info | elog.Warning

const gigabyte = 1 << 30

// Test messages.
var messages = map[uint32]struct {
eventType uint16
Expand Down Expand Up @@ -72,7 +76,7 @@ var oneTimeLogpInit sync.Once
func configureLogp() {
oneTimeLogpInit.Do(func() {
if testing.Verbose() {
logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"eventlog", "eventlog_detail"})
logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"eventlog"})
logp.Info("DEBUG enabled for eventlog.")
} else {
logp.LogInit(logp.LOG_WARNING, "", false, true, []string{})
Expand Down Expand Up @@ -143,6 +147,14 @@ func uninstallLog(provider, source string, log *elog.Log) error {
return errs.Err()
}

// setLogSize set the maximum number of bytes that an event log can hold.
func setLogSize(t testing.TB, provider string, sizeBytes int) {
output, err := exec.Command("wevtutil.exe", "sl", "/ms:"+strconv.Itoa(sizeBytes), providerName).CombinedOutput()
if err != nil {
t.Fatal("failed to set log size", err, string(output))
}
}

// Verify that all messages are read from the event log.
func TestRead(t *testing.T) {

Expand Down
61 changes: 36 additions & 25 deletions winlogbeat/eventlog/wineventlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package eventlog

import (
"fmt"
"strconv"
"syscall"
"time"

Expand All @@ -13,6 +12,7 @@ import (
"github.com/elastic/beats/winlogbeat/sys"
win "github.com/elastic/beats/winlogbeat/sys/wineventlog"
"github.com/joeshaw/multierror"
"github.com/pkg/errors"
"golang.org/x/sys/windows"
)

Expand Down Expand Up @@ -73,6 +73,7 @@ type winEventLog struct {
channelName string // Name of the channel from which to read.
subscription win.EvtHandle // Handle to the subscription.
maxRead int // Maximum number returned in one Read.
lastRead uint64 // Record number of the last read event.

render func(event win.EvtHandle) (string, error) // Function for rendering the event to XML.
renderBuf []byte // Buffer used for rendering event.
Expand Down Expand Up @@ -118,13 +119,8 @@ func (l *winEventLog) Open(recordNumber uint64) error {
}

func (l *winEventLog) Read() ([]Record, error) {
handles, err := win.EventHandles(l.subscription, l.maxRead)
if err == win.ERROR_NO_MORE_ITEMS {
detailf("%s No more events", l.logPrefix)
return nil, nil
}
if err != nil {
logp.Warn("%s EventHandles returned error %v", l.logPrefix, err)
handles, _, err := l.eventHandles(l.maxRead)
if err != nil || len(handles) == 0 {
return nil, err
}
defer func() {
Expand All @@ -145,17 +141,18 @@ func (l *winEventLog) Read() ([]Record, error) {
}
if err != nil && x == "" {
logp.Err("%s Dropping event with rendering error. %v", l.logPrefix, err)
reportDrop(err)
incrementMetric(dropReasons, err)
continue
}

r, err := l.buildRecordFromXML(x, err)
if err != nil {
logp.Err("%s Dropping event. %v", l.logPrefix, err)
reportDrop("unmarshal")
incrementMetric(dropReasons, err)
continue
}
records = append(records, r)
l.lastRead = r.RecordID
}

debugf("%s Read() is returning %d records", l.logPrefix, len(records))
Expand All @@ -167,6 +164,34 @@ func (l *winEventLog) Close() error {
return win.Close(l.subscription)
}

func (l *winEventLog) eventHandles(maxRead int) ([]win.EvtHandle, int, error) {
handles, err := win.EventHandles(l.subscription, maxRead)
switch err {
case nil:
if l.maxRead > maxRead {
debugf("%s Recovered from RPC_S_INVALID_BOUND error (errno 1734) "+
"by decreasing batch_read_size to %v", l.logPrefix, maxRead)
}
return handles, maxRead, nil
case win.ERROR_NO_MORE_ITEMS:
detailf("%s No more events", l.logPrefix)
return nil, maxRead, nil
case win.RPC_S_INVALID_BOUND:
incrementMetric(readErrors, err)
if err := l.Close(); err != nil {
return nil, 0, errors.Wrap(err, "failed to recover from RPC_S_INVALID_BOUND")
}
if err := l.Open(l.lastRead); err != nil {
return nil, 0, errors.Wrap(err, "failed to recover from RPC_S_INVALID_BOUND")
}
return l.eventHandles(maxRead / 2)
default:
incrementMetric(readErrors, err)
logp.Warn("%s EventHandles returned error %v", l.logPrefix, err)
return nil, 0, err
}
}

func (l *winEventLog) buildRecordFromXML(x string, recoveredErr error) (Record, error) {
e, err := sys.UnmarshalEventXML([]byte(x))
if err != nil {
Expand Down Expand Up @@ -204,20 +229,6 @@ func (l *winEventLog) buildRecordFromXML(x string, recoveredErr error) (Record,
return r, nil
}

// reportDrop reports a dropped event log record and the reason as an expvar
// metric. The reason should be a windows syscall.Errno or a string. Any other
// types will be reported under the "other" key.
func reportDrop(reason interface{}) {
switch t := reason.(type) {
default:
dropReasons.Add("other", 1)
case string:
dropReasons.Add(t, 1)
case syscall.Errno:
dropReasons.Add(strconv.Itoa(int(t)), 1)
}
}

// newWinEventLog creates and returns a new EventLog for reading event logs
// using the Windows Event Log.
func newWinEventLog(options map[string]interface{}) (EventLog, error) {
Expand Down Expand Up @@ -283,7 +294,7 @@ func newWinEventLog(options map[string]interface{}) (EventLog, error) {
}

func init() {
// Register eventlogging API if it is available.
// Register wineventlog API if it is available.
available, _ := win.IsAvailable()
if available {
Register(winEventLogAPIName, 0, newWinEventLog, win.Channels)
Expand Down
Loading

0 comments on commit d06e42c

Please sign in to comment.