Skip to content

Commit

Permalink
Unify event, data and state handling in filebeat (#4171)
Browse files Browse the repository at this point in the history
The data and event handling has grown over time in filebeat. One the one hand meta data and event data was mixed and different structure to handle the data existed. The structures which existed were focused in the log prospector. This change brings a unified data handling with a data object which separates event, meta data and state. This new model should allow to add new prospector types and use the same data objects.

The reason it ended up in `util` package is because `data` is taken by the data folder and `common` would lead to too many conflicts with `common` package in libbeat.
  • Loading branch information
ruflin authored and 7AC committed May 2, 2017
1 parent f7b88fb commit 4fddfd4
Showing 17 changed files with 213 additions and 223 deletions.
18 changes: 9 additions & 9 deletions filebeat/beater/channels.go
Original file line number Diff line number Diff line change
@@ -4,9 +4,9 @@ import (
"sync"
"sync/atomic"

"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/filebeat/registrar"
"github.com/elastic/beats/filebeat/spooler"
"github.com/elastic/beats/filebeat/util"
)

type spoolerOutlet struct {
@@ -19,12 +19,12 @@ type spoolerOutlet struct {

type publisherChannel struct {
done chan struct{}
ch chan []*input.Data
ch chan []*util.Data
}

type registrarLogger struct {
done chan struct{}
ch chan<- []*input.Data
ch chan<- []*util.Data
}

type finishedLogger struct {
@@ -44,7 +44,7 @@ func newSpoolerOutlet(
}
}

func (o *spoolerOutlet) OnEvent(event *input.Data) bool {
func (o *spoolerOutlet) OnEvent(data *util.Data) bool {
open := atomic.LoadInt32(&o.isOpen) == 1
if !open {
return false
@@ -61,20 +61,20 @@ func (o *spoolerOutlet) OnEvent(event *input.Data) bool {
}
atomic.StoreInt32(&o.isOpen, 0)
return false
case o.spooler.Channel <- event:
case o.spooler.Channel <- data:
return true
}
}

func newPublisherChannel() *publisherChannel {
return &publisherChannel{
done: make(chan struct{}),
ch: make(chan []*input.Data, 1),
ch: make(chan []*util.Data, 1),
}
}

func (c *publisherChannel) Close() { close(c.done) }
func (c *publisherChannel) Send(events []*input.Data) bool {
func (c *publisherChannel) Send(events []*util.Data) bool {
select {
case <-c.done:
// set ch to nil, so no more events will be send after channel close signal
@@ -96,7 +96,7 @@ func newRegistrarLogger(reg *registrar.Registrar) *registrarLogger {
}

func (l *registrarLogger) Close() { close(l.done) }
func (l *registrarLogger) Published(events []*input.Data) bool {
func (l *registrarLogger) Published(events []*util.Data) bool {
select {
case <-l.done:
// set ch to nil, so no more events will be send after channel close signal
@@ -114,7 +114,7 @@ func newFinishedLogger(wg *sync.WaitGroup) *finishedLogger {
return &finishedLogger{wg}
}

func (l *finishedLogger) Published(events []*input.Data) bool {
func (l *finishedLogger) Published(events []*util.Data) bool {
for range events {
l.wg.Done()
}
6 changes: 3 additions & 3 deletions filebeat/channel/interface.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package channel

import "github.com/elastic/beats/filebeat/input"
import "github.com/elastic/beats/filebeat/util"

// Outleter is the outlet for a prospector
type Outleter interface {
SetSignal(signal <-chan struct{})
OnEventSignal(event *input.Data) bool
OnEvent(event *input.Data) bool
OnEventSignal(data *util.Data) bool
OnEvent(data *util.Data) bool
Copy() Outleter
}
14 changes: 7 additions & 7 deletions filebeat/channel/outlet.go
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@ import (
"sync"
"sync/atomic"

"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/filebeat/util"
)

// Outlet struct is used to be passed to an object which needs an outlet
@@ -19,13 +19,13 @@ type Outlet struct {
wg *sync.WaitGroup // Use for counting active events
done <-chan struct{}
signal <-chan struct{}
channel chan *input.Data
channel chan *util.Data
isOpen int32 // atomic indicator
}

func NewOutlet(
done <-chan struct{},
c chan *input.Data,
c chan *util.Data,
wg *sync.WaitGroup,
) *Outlet {
return &Outlet{
@@ -42,7 +42,7 @@ func (o *Outlet) SetSignal(signal <-chan struct{}) {
o.signal = signal
}

func (o *Outlet) OnEvent(event *input.Data) bool {
func (o *Outlet) OnEvent(data *util.Data) bool {
open := atomic.LoadInt32(&o.isOpen) == 1
if !open {
return false
@@ -59,15 +59,15 @@ func (o *Outlet) OnEvent(event *input.Data) bool {
}
atomic.StoreInt32(&o.isOpen, 0)
return false
case o.channel <- event:
case o.channel <- data:
return true
}
}

// OnEventSignal can be stopped by the signal that is set with SetSignal
// This does not close the outlet. Only OnEvent does close the outlet.
// If OnEventSignal is used, it must be ensured that only one producer is used.
func (o *Outlet) OnEventSignal(event *input.Data) bool {
func (o *Outlet) OnEventSignal(data *util.Data) bool {
open := atomic.LoadInt32(&o.isOpen) == 1
if !open {
return false
@@ -84,7 +84,7 @@ func (o *Outlet) OnEventSignal(event *input.Data) bool {
}
o.signal = nil
return false
case o.channel <- event:
case o.channel <- data:
return true
}
}
33 changes: 12 additions & 21 deletions filebeat/harvester/harvester.go
Original file line number Diff line number Diff line change
@@ -21,8 +21,8 @@ import (
"github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/harvester/encoding"
"github.com/elastic/beats/filebeat/harvester/source"
"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/filebeat/util"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/processors"
@@ -38,15 +38,14 @@ var (

type Outlet interface {
SetSignal(signal <-chan struct{})
OnEventSignal(event *input.Data) bool
OnEvent(event *input.Data) bool
OnEventSignal(data *util.Data) bool
OnEvent(data *util.Data) bool
}

type Harvester struct {
config harvesterConfig
state file.State
states *file.States
prospectorChan chan *input.Event
file source.FileSource /* the file being watched */
fileReader *LogFile
encodingFactory encoding.EncodingFactory
@@ -119,29 +118,21 @@ func (h *Harvester) open() error {

// updateState updates the prospector state and forwards the event to the spooler
// All state updates done by the prospector itself are synchronous to make sure not states are overwritten
func (h *Harvester) forwardEvent(event *input.Event) error {
func (h *Harvester) forwardEvent(data *util.Data) error {

// Add additional prospector meta data to the event
event.InputType = h.config.InputType
event.Pipeline = h.config.Pipeline
event.Module = h.config.Module
event.Fileset = h.config.Fileset
data.Meta.Pipeline = h.config.Pipeline
data.Meta.Module = h.config.Module
data.Meta.Fileset = h.config.Fileset

if event.Data != nil {
event.Data[common.EventMetadataKey] = h.config.EventMetadata
}

eventHolder := event.GetData()
//run the filters before sending to spooler
if event.Bytes > 0 {
eventHolder.Event = h.processors.Run(eventHolder.Event)
}
if data.HasEvent() {
data.Event[common.EventMetadataKey] = h.config.EventMetadata

if eventHolder.Event == nil {
eventHolder.Metadata.Bytes = 0
// run the filters before sending to spooler
data.Event = h.processors.Run(data.Event)
}

ok := h.outlet.OnEventSignal(&eventHolder)
ok := h.outlet.OnEventSignal(data)

if !ok {
logp.Info("Prospector outlet closed")
40 changes: 21 additions & 19 deletions filebeat/harvester/log.go
Original file line number Diff line number Diff line change
@@ -11,8 +11,8 @@ import (
"github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/harvester/reader"
"github.com/elastic/beats/filebeat/harvester/source"
"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/filebeat/util"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/jsontransform"
"github.com/elastic/beats/libbeat/logp"
@@ -135,41 +135,44 @@ func (h *Harvester) Harvest(r reader.Reader) {
state := h.getState()

// Create state event
event := input.NewEvent(state)
data := util.NewData()
if h.file.HasState() {
data.SetState(state)
}

text := string(message.Content)

// Check if data should be added to event. Only export non empty events.
if !message.IsEmpty() && h.shouldExportLine(text) {
event.Bytes = message.Bytes

event.Data = common.MapStr{
data.Event = common.MapStr{
"@timestamp": common.Time(message.Ts),
"source": h.state.Source,
"offset": h.state.Offset, // Offset here is the offset before the starting char.
"type": h.config.DocumentType,
"input_type": h.config.InputType,
}
event.Data.DeepUpdate(message.Fields)
data.Event.DeepUpdate(message.Fields)

// Check if json fields exist
var jsonFields common.MapStr
if fields, ok := event.Data["json"]; ok {
if fields, ok := data.Event["json"]; ok {
jsonFields = fields.(common.MapStr)
}

if h.config.JSON != nil && len(jsonFields) > 0 {
h.mergeJSONFields(event.Data, jsonFields, &text)
h.mergeJSONFields(data.Event, jsonFields, &text)
} else if &text != nil {
if event.Data == nil {
event.Data = common.MapStr{}
if data.Event == nil {
data.Event = common.MapStr{}
}
event.Data["message"] = text
data.Event["message"] = text
}
}

// Always send event to update state, also if lines was skipped
// Stop harvester in case of an error
if !h.sendEvent(event) {
if !h.sendEvent(data) {
return
}
// Update state of harvester as successfully sent
@@ -192,11 +195,11 @@ func (h *Harvester) Stop() {

// sendEvent sends event to the spooler channel
// Return false if event was not sent
func (h *Harvester) sendEvent(event *input.Event) bool {
func (h *Harvester) sendEvent(data *util.Data) bool {
if h.file.HasState() {
h.states.Update(event.State)
h.states.Update(h.state)
}
err := h.forwardEvent(event)
err := h.forwardEvent(data)
return err == nil
}

@@ -212,12 +215,11 @@ func (h *Harvester) SendStateUpdate() {
}

logp.Debug("harvester", "Update state: %s, offset: %v", h.state.Source, h.state.Offset)
h.states.Update(h.state)

event := input.NewEvent(h.state)
h.states.Update(event.State)

data := event.GetData()
h.outlet.OnEvent(&data)
d := util.NewData()
d.SetState(h.state)
h.outlet.OnEvent(d)
}

// shouldExportLine decides if the line is exported or not based on
82 changes: 0 additions & 82 deletions filebeat/input/event.go

This file was deleted.

Loading

0 comments on commit 4fddfd4

Please sign in to comment.