Skip to content

Commit

Permalink
Read archived .evtx files with Winlogbeat (#11361)
Browse files Browse the repository at this point in the history
This gives Winlogbeat the ability to read from archived .evtx files. The `name` parameter recognizes that the value is absolute path and then uses the appropriate APIs to open the file and ingest its contents. In order to support the use case of reading from a file and then exiting when there are no more events (`ERROR_NO_MORE_ITEMS`) I added a config option to change the behavior of the reader from waiting for more events to stopping.

I also had to add `shutdown_timeout` option to make Winlogbeat wait for events to finish publishing before exiting.

To keep it simple, globs are not supported. This would have required the introduction of a "prospector" to continuously monitor the glob for new / moved / deleted files.

    winlogbeat.event_logs:
      - name: ${EVTX_FILE}
        no_more_events: stop

    winlogbeat.shutdown_timeout: 30s
    winlogbeat.registry_file: evtx-registry.yml

    output.elasticsearch.hosts: ['http://localhost:9200']

Closes #4450
  • Loading branch information
andrewkroh authored Apr 9, 2019
1 parent 17edc90 commit dfabb06
Show file tree
Hide file tree
Showing 23 changed files with 700 additions and 65 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

*Functionbeat*

*Winlogbeat*

- Add support for reading from .evtx files. {issue}4450[4450]

==== Deprecated

*Affecting all Beats*
Expand Down
9 changes: 7 additions & 2 deletions winlogbeat/_meta/beat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,18 @@
# in the directory in which it was started.
#winlogbeat.registry_file: .winlogbeat.yml

# The maximum amount of time Winlogbeat should wait for events to finish
# publishing when shutting down.
#winlogbeat.shutdown_timeout: 0s

# event_logs specifies a list of event logs to monitor as well as any
# accompanying options. The YAML data type of event_logs is a list of
# dictionaries.
#
# The supported keys are name (required), tags, fields, fields_under_root,
# forwarded, ignore_older, level, event_id, provider, and include_xml. Please
# visit the documentation for the complete details of each option.
# forwarded, ignore_older, level, no_more_events, event_id, provider, and
# include_xml. Please visit the documentation for the complete details of each
# option.
# https://go.es.io/WinlogbeatConfig
winlogbeat.event_logs:
- name: Application
Expand Down
8 changes: 8 additions & 0 deletions winlogbeat/_meta/fields.common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@
Fields from the Windows Event Log.
fields:
# Candidate to add to ECS

- name: log.file.path
type: keyword
required: false
description: >
The name of the file the event was read from when Winlogbeat is
reading directly from an .evtx file.
- name: event.code
type: keyword
required: false
Expand Down
81 changes: 81 additions & 0 deletions winlogbeat/beater/acker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package beater

import (
"context"
"sync"

"github.com/elastic/beats/libbeat/common/atomic"
"github.com/elastic/beats/winlogbeat/checkpoint"
)

type eventACKer struct {
active *atomic.Int
wg *sync.WaitGroup
checkpoint *checkpoint.Checkpoint
}

func newEventACKer(checkpoint *checkpoint.Checkpoint) *eventACKer {
return &eventACKer{
active: atomic.NewInt(0),
wg: &sync.WaitGroup{},
checkpoint: checkpoint,
}
}

// ACKEvents receives callbacks from the publisher for every event that is
// published. It persists the record number of the last event in each
func (a *eventACKer) ACKEvents(data []interface{}) {
states := make(map[string]*checkpoint.EventLogState)

for _, datum := range data {
if st, ok := datum.(checkpoint.EventLogState); ok {
states[st.Name] = &st
}
}

for _, st := range states {
a.checkpoint.PersistState(*st)
}

// Mark events as done (subtract).
a.active.Add(-1 * len(data))
a.wg.Add(-1 * len(data))
}

// Wait waits for all events to be ACKed or for the context to be done.
func (a *eventACKer) Wait(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
go func() {
defer cancel()
a.wg.Wait()
}()
<-ctx.Done()
}

// Add adds to the number of active events.
func (a *eventACKer) Add(delta int) {
a.active.Add(delta)
a.wg.Add(delta)
}

// Active returns the number of active events (published but not yet ACKed).
func (a *eventACKer) Active() int {
return a.active.Load()
}
16 changes: 11 additions & 5 deletions winlogbeat/beater/eventlogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package beater

import (
"io"
"time"

"github.com/elastic/beats/libbeat/beat"
Expand Down Expand Up @@ -81,6 +82,7 @@ func (e *eventLogger) run(
done <-chan struct{},
pipeline beat.Pipeline,
state checkpoint.EventLogState,
acker *eventACKer,
) {
api := e.source

Expand Down Expand Up @@ -118,7 +120,7 @@ func (e *eventLogger) run(

debugf("EventLog[%s] opened successfully", api.Name())

for {
for stop := false; !stop; {
select {
case <-done:
return
Expand All @@ -127,19 +129,23 @@ func (e *eventLogger) run(

// Read from the event.
records, err := api.Read()
if err != nil {
switch err {
case nil:
case io.EOF:
// Graceful stop.
stop = true
default:
logp.Warn("EventLog[%s] Read() error: %v", api.Name(), err)
break
return
}

debugf("EventLog[%s] Read() returned %d records", api.Name(), len(records))
if len(records) == 0 {
// TODO: Consider implementing notifications using
// NotifyChangeEventLog instead of polling.
time.Sleep(time.Second)
continue
}

acker.Add(len(records))
for _, lr := range records {
client.Publish(lr.ToEvent())
}
Expand Down
26 changes: 16 additions & 10 deletions winlogbeat/beater/winlogbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Winlogbeat. The main event loop is implemented in this package.
package beater

import (
"context"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -126,20 +127,15 @@ func (eb *Winlogbeat) Run(b *beat.Beat) error {
return err
}

acker := newEventACKer(eb.checkpoint)
persistedState := eb.checkpoint.States()

// Initialize metrics.
initMetrics("total")

// setup global event ACK handler
err := eb.pipeline.SetACKHandler(beat.PipelineACKHandler{
ACKLastEvents: func(data []interface{}) {
for _, datum := range data {
if st, ok := datum.(checkpoint.EventLogState); ok {
eb.checkpoint.PersistState(st)
}
}
},
ACKEvents: acker.ACKEvents,
})
if err != nil {
return err
Expand All @@ -151,11 +147,20 @@ func (eb *Winlogbeat) Run(b *beat.Beat) error {

// Start a goroutine for each event log.
wg.Add(1)
go eb.processEventLog(&wg, log, state)
go eb.processEventLog(&wg, log, state, acker)
}

wg.Wait()
eb.checkpoint.Shutdown()
defer eb.checkpoint.Shutdown()

if eb.config.ShutdownTimeout > 0 {
logp.Info("Shutdown will wait max %v for the remaining %v events to publish.",
eb.config.ShutdownTimeout, acker.Active())
ctx, cancel := context.WithTimeout(context.Background(), eb.config.ShutdownTimeout)
defer cancel()
acker.Wait(ctx)
}

return nil
}

Expand All @@ -171,7 +176,8 @@ func (eb *Winlogbeat) processEventLog(
wg *sync.WaitGroup,
logger *eventLogger,
state checkpoint.EventLogState,
acker *eventACKer,
) {
defer wg.Done()
logger.run(eb.done, eb.pipeline, state)
logger.run(eb.done, eb.pipeline, state, acker)
}
8 changes: 5 additions & 3 deletions winlogbeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package config

import (
"fmt"
"time"

"github.com/joeshaw/multierror"

Expand All @@ -39,8 +40,9 @@ var (

// WinlogbeatConfig contains all of Winlogbeat configuration data.
type WinlogbeatConfig struct {
EventLogs []*common.Config `config:"event_logs"`
RegistryFile string `config:"registry_file"`
EventLogs []*common.Config `config:"event_logs"`
RegistryFile string `config:"registry_file"`
ShutdownTimeout time.Duration `config:"shutdown_timeout"`
}

// Validate validates the WinlogbeatConfig data and returns an error describing
Expand All @@ -49,7 +51,7 @@ func (ebc WinlogbeatConfig) Validate() error {
var errs multierror.Errors

if len(ebc.EventLogs) == 0 {
errs = append(errs, fmt.Errorf("At least one event log must be "+
errs = append(errs, fmt.Errorf("at least one event log must be "+
"configured as part of event_logs"))
}

Expand Down
2 changes: 1 addition & 1 deletion winlogbeat/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestConfigValidate(t *testing.T) {
},
{
WinlogbeatConfig{},
"1 error: At least one event log must be configured as part of " +
"1 error: at least one event log must be configured as part of " +
"event_logs",
},
}
Expand Down
37 changes: 37 additions & 0 deletions winlogbeat/docs/faq.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,40 @@ Prior to the hostname configuration stage, during OS installation any event log
records generated may have a randomly assigned hostname.

include::{libbeat-dir}/docs/shared-faq.asciidoc[]

[float]
[[reading-from-evtx]]
=== Can Winlogbeat read from .evtx files?

Yes, Winlogbeat can ingest archived .evtx files. When you set the `name`
parameter as the absolute path to an event log file it will read from that file.
Here's an example. First create a new config file for Winlogbeat.

winlogbeat-evtx.yml
[source,yaml]
----
winlogbeat.event_logs:
- name: ${EVTX_FILE} <1>
no_more_events: stop <2>
winlogbeat.shutdown_timeout: 30s <3>
winlogbeat.registry_file: evtx-registry.yml <4>
output.elasticsearch.hosts: ['http://localhost:9200']
----
1. `name` will be set to the value of the `EVTX_FILE` environment variable.
2. `no_more_events` sets the behavior of Winlogbeat when Windows reports that
there are no more events to read. We want Winlogbeat to _stop_ rather than
_wait_ since this is an archived file that will not receive any more events.
3. `shutdown_timeout` controls the maximum amount of time Winlogbeat will wait
to finish publishing the events to Elasticsearch after stopping because it
reached the end of the log.
4. A separate registry file is used to avoid overwriting the default registry
file. You can delete this file after you're done ingesting the .evtx data.

Now execute Winlogbeat and wait for it to complete. It will exit when it's done.

[source,sh]
----
.\winlogbeat.exe -e -c .\winlogbeat-evtx.yml -E EVTX_FILE=c:\backup\Security-2019.01.evtx
----
12 changes: 12 additions & 0 deletions winlogbeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -3823,6 +3823,18 @@ Fields from the Windows Event Log.
*`log.file.path`*::
+
--
type: keyword
required: False
The name of the file the event was read from when Winlogbeat is reading directly from an .evtx file.
--
*`event.code`*::
+
--
Expand Down
Loading

0 comments on commit dfabb06

Please sign in to comment.