From 4fddfd43c510e12bf739b5b4c5c21dcff3152923 Mon Sep 17 00:00:00 2001 From: Nicolas Ruflin Date: Tue, 2 May 2017 20:15:01 +0200 Subject: [PATCH] Unify event, data and state handling in filebeat (#4171) The data and event handling has grown over time in filebeat. One the one hand meta data and event data was mixed and different structure to handle the data existed. The structures which existed were focused in the log prospector. This change brings a unified data handling with a data object which separates event, meta data and state. This new model should allow to add new prospector types and use the same data objects. The reason it ended up in `util` package is because `data` is taken by the data folder and `common` would lead to too many conflicts with `common` package in libbeat. --- filebeat/beater/channels.go | 18 ++-- filebeat/channel/interface.go | 6 +- filebeat/channel/outlet.go | 14 ++-- filebeat/harvester/harvester.go | 33 +++----- filebeat/harvester/log.go | 40 ++++----- filebeat/input/event.go | 82 ------------------- filebeat/input/event_test.go | 15 ---- filebeat/prospector/prospector.go | 9 +- .../prospector/prospector_log_other_test.go | 10 +-- filebeat/publisher/async.go | 8 +- filebeat/publisher/publisher.go | 20 ++--- filebeat/publisher/publisher_test.go | 25 ++---- filebeat/publisher/sync.go | 8 +- filebeat/registrar/registrar.go | 20 ++--- filebeat/spooler/spooler.go | 24 +++--- filebeat/util/data.go | 65 +++++++++++++++ filebeat/util/data_test.go | 39 +++++++++ 17 files changed, 213 insertions(+), 223 deletions(-) delete mode 100644 filebeat/input/event.go delete mode 100644 filebeat/input/event_test.go create mode 100644 filebeat/util/data.go create mode 100644 filebeat/util/data_test.go diff --git a/filebeat/beater/channels.go b/filebeat/beater/channels.go index 13c4edc5aaee..51883c563e85 100644 --- a/filebeat/beater/channels.go +++ b/filebeat/beater/channels.go @@ -4,9 +4,9 @@ import ( "sync" "sync/atomic" - "github.com/elastic/beats/filebeat/input" "github.com/elastic/beats/filebeat/registrar" "github.com/elastic/beats/filebeat/spooler" + "github.com/elastic/beats/filebeat/util" ) type spoolerOutlet struct { @@ -19,12 +19,12 @@ type spoolerOutlet struct { type publisherChannel struct { done chan struct{} - ch chan []*input.Data + ch chan []*util.Data } type registrarLogger struct { done chan struct{} - ch chan<- []*input.Data + ch chan<- []*util.Data } type finishedLogger struct { @@ -44,7 +44,7 @@ func newSpoolerOutlet( } } -func (o *spoolerOutlet) OnEvent(event *input.Data) bool { +func (o *spoolerOutlet) OnEvent(data *util.Data) bool { open := atomic.LoadInt32(&o.isOpen) == 1 if !open { return false @@ -61,7 +61,7 @@ func (o *spoolerOutlet) OnEvent(event *input.Data) bool { } atomic.StoreInt32(&o.isOpen, 0) return false - case o.spooler.Channel <- event: + case o.spooler.Channel <- data: return true } } @@ -69,12 +69,12 @@ func (o *spoolerOutlet) OnEvent(event *input.Data) bool { func newPublisherChannel() *publisherChannel { return &publisherChannel{ done: make(chan struct{}), - ch: make(chan []*input.Data, 1), + ch: make(chan []*util.Data, 1), } } func (c *publisherChannel) Close() { close(c.done) } -func (c *publisherChannel) Send(events []*input.Data) bool { +func (c *publisherChannel) Send(events []*util.Data) bool { select { case <-c.done: // set ch to nil, so no more events will be send after channel close signal @@ -96,7 +96,7 @@ func newRegistrarLogger(reg *registrar.Registrar) *registrarLogger { } func (l *registrarLogger) Close() { close(l.done) } -func (l *registrarLogger) Published(events []*input.Data) bool { +func (l *registrarLogger) Published(events []*util.Data) bool { select { case <-l.done: // set ch to nil, so no more events will be send after channel close signal @@ -114,7 +114,7 @@ func newFinishedLogger(wg *sync.WaitGroup) *finishedLogger { return &finishedLogger{wg} } -func (l *finishedLogger) Published(events []*input.Data) bool { +func (l *finishedLogger) Published(events []*util.Data) bool { for range events { l.wg.Done() } diff --git a/filebeat/channel/interface.go b/filebeat/channel/interface.go index 496a24f23094..c4e7d9ad596e 100644 --- a/filebeat/channel/interface.go +++ b/filebeat/channel/interface.go @@ -1,11 +1,11 @@ package channel -import "github.com/elastic/beats/filebeat/input" +import "github.com/elastic/beats/filebeat/util" // Outleter is the outlet for a prospector type Outleter interface { SetSignal(signal <-chan struct{}) - OnEventSignal(event *input.Data) bool - OnEvent(event *input.Data) bool + OnEventSignal(data *util.Data) bool + OnEvent(data *util.Data) bool Copy() Outleter } diff --git a/filebeat/channel/outlet.go b/filebeat/channel/outlet.go index 6bbc6f8074dd..0ba373f911ec 100644 --- a/filebeat/channel/outlet.go +++ b/filebeat/channel/outlet.go @@ -4,7 +4,7 @@ import ( "sync" "sync/atomic" - "github.com/elastic/beats/filebeat/input" + "github.com/elastic/beats/filebeat/util" ) // Outlet struct is used to be passed to an object which needs an outlet @@ -19,13 +19,13 @@ type Outlet struct { wg *sync.WaitGroup // Use for counting active events done <-chan struct{} signal <-chan struct{} - channel chan *input.Data + channel chan *util.Data isOpen int32 // atomic indicator } func NewOutlet( done <-chan struct{}, - c chan *input.Data, + c chan *util.Data, wg *sync.WaitGroup, ) *Outlet { return &Outlet{ @@ -42,7 +42,7 @@ func (o *Outlet) SetSignal(signal <-chan struct{}) { o.signal = signal } -func (o *Outlet) OnEvent(event *input.Data) bool { +func (o *Outlet) OnEvent(data *util.Data) bool { open := atomic.LoadInt32(&o.isOpen) == 1 if !open { return false @@ -59,7 +59,7 @@ func (o *Outlet) OnEvent(event *input.Data) bool { } atomic.StoreInt32(&o.isOpen, 0) return false - case o.channel <- event: + case o.channel <- data: return true } } @@ -67,7 +67,7 @@ func (o *Outlet) OnEvent(event *input.Data) bool { // OnEventSignal can be stopped by the signal that is set with SetSignal // This does not close the outlet. Only OnEvent does close the outlet. // If OnEventSignal is used, it must be ensured that only one producer is used. -func (o *Outlet) OnEventSignal(event *input.Data) bool { +func (o *Outlet) OnEventSignal(data *util.Data) bool { open := atomic.LoadInt32(&o.isOpen) == 1 if !open { return false @@ -84,7 +84,7 @@ func (o *Outlet) OnEventSignal(event *input.Data) bool { } o.signal = nil return false - case o.channel <- event: + case o.channel <- data: return true } } diff --git a/filebeat/harvester/harvester.go b/filebeat/harvester/harvester.go index 2423eb5e8f8c..34bc6d3fd7fc 100644 --- a/filebeat/harvester/harvester.go +++ b/filebeat/harvester/harvester.go @@ -21,8 +21,8 @@ import ( "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/harvester/encoding" "github.com/elastic/beats/filebeat/harvester/source" - "github.com/elastic/beats/filebeat/input" "github.com/elastic/beats/filebeat/input/file" + "github.com/elastic/beats/filebeat/util" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/processors" @@ -38,15 +38,14 @@ var ( type Outlet interface { SetSignal(signal <-chan struct{}) - OnEventSignal(event *input.Data) bool - OnEvent(event *input.Data) bool + OnEventSignal(data *util.Data) bool + OnEvent(data *util.Data) bool } type Harvester struct { config harvesterConfig state file.State states *file.States - prospectorChan chan *input.Event file source.FileSource /* the file being watched */ fileReader *LogFile encodingFactory encoding.EncodingFactory @@ -119,29 +118,21 @@ func (h *Harvester) open() error { // updateState updates the prospector state and forwards the event to the spooler // All state updates done by the prospector itself are synchronous to make sure not states are overwritten -func (h *Harvester) forwardEvent(event *input.Event) error { +func (h *Harvester) forwardEvent(data *util.Data) error { // Add additional prospector meta data to the event - event.InputType = h.config.InputType - event.Pipeline = h.config.Pipeline - event.Module = h.config.Module - event.Fileset = h.config.Fileset + data.Meta.Pipeline = h.config.Pipeline + data.Meta.Module = h.config.Module + data.Meta.Fileset = h.config.Fileset - if event.Data != nil { - event.Data[common.EventMetadataKey] = h.config.EventMetadata - } - - eventHolder := event.GetData() - //run the filters before sending to spooler - if event.Bytes > 0 { - eventHolder.Event = h.processors.Run(eventHolder.Event) - } + if data.HasEvent() { + data.Event[common.EventMetadataKey] = h.config.EventMetadata - if eventHolder.Event == nil { - eventHolder.Metadata.Bytes = 0 + // run the filters before sending to spooler + data.Event = h.processors.Run(data.Event) } - ok := h.outlet.OnEventSignal(&eventHolder) + ok := h.outlet.OnEventSignal(data) if !ok { logp.Info("Prospector outlet closed") diff --git a/filebeat/harvester/log.go b/filebeat/harvester/log.go index c1300e7008db..8e009f8cf39f 100644 --- a/filebeat/harvester/log.go +++ b/filebeat/harvester/log.go @@ -11,8 +11,8 @@ import ( "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/harvester/reader" "github.com/elastic/beats/filebeat/harvester/source" - "github.com/elastic/beats/filebeat/input" "github.com/elastic/beats/filebeat/input/file" + "github.com/elastic/beats/filebeat/util" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/jsontransform" "github.com/elastic/beats/libbeat/logp" @@ -135,41 +135,44 @@ func (h *Harvester) Harvest(r reader.Reader) { state := h.getState() // Create state event - event := input.NewEvent(state) + data := util.NewData() + if h.file.HasState() { + data.SetState(state) + } + text := string(message.Content) // Check if data should be added to event. Only export non empty events. if !message.IsEmpty() && h.shouldExportLine(text) { - event.Bytes = message.Bytes - event.Data = common.MapStr{ + data.Event = common.MapStr{ "@timestamp": common.Time(message.Ts), "source": h.state.Source, "offset": h.state.Offset, // Offset here is the offset before the starting char. "type": h.config.DocumentType, "input_type": h.config.InputType, } - event.Data.DeepUpdate(message.Fields) + data.Event.DeepUpdate(message.Fields) // Check if json fields exist var jsonFields common.MapStr - if fields, ok := event.Data["json"]; ok { + if fields, ok := data.Event["json"]; ok { jsonFields = fields.(common.MapStr) } if h.config.JSON != nil && len(jsonFields) > 0 { - h.mergeJSONFields(event.Data, jsonFields, &text) + h.mergeJSONFields(data.Event, jsonFields, &text) } else if &text != nil { - if event.Data == nil { - event.Data = common.MapStr{} + if data.Event == nil { + data.Event = common.MapStr{} } - event.Data["message"] = text + data.Event["message"] = text } } // Always send event to update state, also if lines was skipped // Stop harvester in case of an error - if !h.sendEvent(event) { + if !h.sendEvent(data) { return } // Update state of harvester as successfully sent @@ -192,11 +195,11 @@ func (h *Harvester) Stop() { // sendEvent sends event to the spooler channel // Return false if event was not sent -func (h *Harvester) sendEvent(event *input.Event) bool { +func (h *Harvester) sendEvent(data *util.Data) bool { if h.file.HasState() { - h.states.Update(event.State) + h.states.Update(h.state) } - err := h.forwardEvent(event) + err := h.forwardEvent(data) return err == nil } @@ -212,12 +215,11 @@ func (h *Harvester) SendStateUpdate() { } logp.Debug("harvester", "Update state: %s, offset: %v", h.state.Source, h.state.Offset) + h.states.Update(h.state) - event := input.NewEvent(h.state) - h.states.Update(event.State) - - data := event.GetData() - h.outlet.OnEvent(&data) + d := util.NewData() + d.SetState(h.state) + h.outlet.OnEvent(d) } // shouldExportLine decides if the line is exported or not based on diff --git a/filebeat/input/event.go b/filebeat/input/event.go deleted file mode 100644 index 77ac5d33c9d7..000000000000 --- a/filebeat/input/event.go +++ /dev/null @@ -1,82 +0,0 @@ -package input - -import ( - "github.com/elastic/beats/filebeat/input/file" - "github.com/elastic/beats/libbeat/common" -) - -// Event is sent to the output and must contain all relevant information -type Event struct { - EventMeta - Data common.MapStr // Use in readers to add data to the event -} - -type EventMeta struct { - Pipeline string - Fileset string - Module string - InputType string - Bytes int - State file.State -} - -type Data struct { - Event common.MapStr - Metadata EventMeta -} - -func NewEvent(state file.State) *Event { - return &Event{ - EventMeta: EventMeta{ - State: state, - }, - } - -} - -func (e *Event) ToMapStr() common.MapStr { - - event := e.Data - if event == nil { - event = common.MapStr{} - } - - if e.Fileset != "" && e.Module != "" { - event["fileset"] = common.MapStr{ - "name": e.Fileset, - "module": e.Module, - } - } - - return event -} - -func (e *Event) GetData() Data { - return Data{ - Event: e.ToMapStr(), - Metadata: EventMeta{ - Pipeline: e.Pipeline, - Bytes: e.Bytes, - State: e.State, - Fileset: e.Fileset, - Module: e.Module, - }, - } -} - -// Metadata creates a common.MapStr containing the metadata to -// be associated with the event. -func (eh *Data) GetMetadata() common.MapStr { - if eh.Metadata.Pipeline != "" { - return common.MapStr{ - "pipeline": eh.Metadata.Pipeline, - } - } - return nil -} - -// HasData returns true if the event itself contains data -// Events without data are only state updates -func (eh *Data) HasData() bool { - return eh.Metadata.Bytes > 0 -} diff --git a/filebeat/input/event_test.go b/filebeat/input/event_test.go deleted file mode 100644 index 2f1249b9d029..000000000000 --- a/filebeat/input/event_test.go +++ /dev/null @@ -1,15 +0,0 @@ -package input - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestEventToMapStr(t *testing.T) { - // Test 'fields' is not present when it is nil. - event := Event{} - mapStr := event.ToMapStr() - _, found := mapStr["fields"] - assert.False(t, found) -} diff --git a/filebeat/prospector/prospector.go b/filebeat/prospector/prospector.go index bc7a26900354..0f6648a92677 100644 --- a/filebeat/prospector/prospector.go +++ b/filebeat/prospector/prospector.go @@ -11,9 +11,9 @@ import ( "github.com/elastic/beats/filebeat/channel" cfg "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/harvester" - "github.com/elastic/beats/filebeat/input" "github.com/elastic/beats/filebeat/input/file" "github.com/elastic/beats/filebeat/prospector/stdin" + "github.com/elastic/beats/filebeat/util" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/monitoring" @@ -174,11 +174,10 @@ func (p *Prospector) updateState(state file.State) error { // Update first internal state p.states.Update(state) - eventHolder := input.NewEvent(state).GetData() - // Set to 0 as these are state updates only - eventHolder.Metadata.Bytes = 0 + data := util.NewData() + data.SetState(state) - ok := p.outlet.OnEvent(&eventHolder) + ok := p.outlet.OnEvent(data) if !ok { logp.Info("Prospector outlet closed") diff --git a/filebeat/prospector/prospector_log_other_test.go b/filebeat/prospector/prospector_log_other_test.go index 70981d7ef5c1..2d926dddd237 100644 --- a/filebeat/prospector/prospector_log_other_test.go +++ b/filebeat/prospector/prospector_log_other_test.go @@ -5,8 +5,8 @@ package prospector import ( "testing" - "github.com/elastic/beats/filebeat/input" "github.com/elastic/beats/filebeat/input/file" + "github.com/elastic/beats/filebeat/util" "github.com/elastic/beats/libbeat/common/match" "github.com/elastic/beats/filebeat/channel" @@ -156,7 +156,7 @@ func TestInit(t *testing.T) { // TestOutlet is an empty outlet for testing type TestOutlet struct{} -func (o TestOutlet) OnEvent(event *input.Data) bool { return true } -func (o TestOutlet) OnEventSignal(event *input.Data) bool { return true } -func (o TestOutlet) SetSignal(signal <-chan struct{}) {} -func (o TestOutlet) Copy() channel.Outleter { return o } +func (o TestOutlet) OnEvent(event *util.Data) bool { return true } +func (o TestOutlet) OnEventSignal(event *util.Data) bool { return true } +func (o TestOutlet) SetSignal(signal <-chan struct{}) {} +func (o TestOutlet) Copy() channel.Outleter { return o } diff --git a/filebeat/publisher/async.go b/filebeat/publisher/async.go index 35b304429ba9..455f55f4df73 100644 --- a/filebeat/publisher/async.go +++ b/filebeat/publisher/async.go @@ -5,7 +5,7 @@ import ( "sync/atomic" "time" - "github.com/elastic/beats/filebeat/input" + "github.com/elastic/beats/filebeat/util" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/publisher" ) @@ -13,7 +13,7 @@ import ( type asyncLogPublisher struct { pub publisher.Publisher client publisher.Client - in chan []*input.Data + in chan []*util.Data out SuccessLogger // list of in-flight batches @@ -29,7 +29,7 @@ type asyncLogPublisher struct { type eventsBatch struct { next *eventsBatch flag int32 - events []*input.Data + events []*util.Data } type batchList struct { @@ -50,7 +50,7 @@ const ( ) func newAsyncLogPublisher( - in chan []*input.Data, + in chan []*util.Data, out SuccessLogger, pub publisher.Publisher, ) *asyncLogPublisher { diff --git a/filebeat/publisher/publisher.go b/filebeat/publisher/publisher.go index 0d982f2f4e07..282dfb40bd56 100644 --- a/filebeat/publisher/publisher.go +++ b/filebeat/publisher/publisher.go @@ -3,7 +3,7 @@ package publisher import ( "errors" - "github.com/elastic/beats/filebeat/input" + "github.com/elastic/beats/filebeat/util" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/monitoring" @@ -24,12 +24,12 @@ type LogPublisher interface { type SuccessLogger interface { // Published will be run after events have been acknowledged by the outputs. - Published(events []*input.Data) bool + Published(events []*util.Data) bool } func New( async bool, - in chan []*input.Data, + in chan []*util.Data, out SuccessLogger, pub publisher.Publisher, ) LogPublisher { @@ -46,13 +46,13 @@ var ( // getDataEvents returns all events which contain data (not only state updates) // together with their associated metadata -func getDataEvents(events []*input.Data) (dataEvents []common.MapStr, meta []common.MapStr) { - dataEvents = make([]common.MapStr, 0, len(events)) - meta = make([]common.MapStr, 0, len(events)) - for _, event := range events { - if event.HasData() { - dataEvents = append(dataEvents, event.Event) - meta = append(meta, event.GetMetadata()) +func getDataEvents(entries []*util.Data) (dataEvents []common.MapStr, meta []common.MapStr) { + dataEvents = make([]common.MapStr, 0, len(entries)) + meta = make([]common.MapStr, 0, len(entries)) + for _, data := range entries { + if data.HasEvent() { + dataEvents = append(dataEvents, data.GetEvent()) + meta = append(meta, data.GetMetadata()) } } return dataEvents, meta diff --git a/filebeat/publisher/publisher_test.go b/filebeat/publisher/publisher_test.go index b771be90277a..16530c217792 100644 --- a/filebeat/publisher/publisher_test.go +++ b/filebeat/publisher/publisher_test.go @@ -7,7 +7,7 @@ import ( "sync" "testing" - "github.com/elastic/beats/filebeat/input" + "github.com/elastic/beats/filebeat/util" "github.com/elastic/beats/libbeat/common/op" pubtest "github.com/elastic/beats/libbeat/publisher/testing" "github.com/stretchr/testify/assert" @@ -15,27 +15,20 @@ import ( type collectLogger struct { wg *sync.WaitGroup - events [][]*input.Data + events [][]*util.Data } -func (l *collectLogger) Published(events []*input.Data) bool { +func (l *collectLogger) Published(events []*util.Data) bool { l.wg.Done() l.events = append(l.events, events) return true } -func makeEvents(name string, n int) []*input.Data { - var events []*input.Data +func makeEvents(name string, n int) []*util.Data { + var events []*util.Data for i := 0; i < n; i++ { - event := &input.Event{ - EventMeta: input.EventMeta{ - InputType: "log", - Bytes: 100, - }, - } - - eventHolder := event.GetData() - events = append(events, &eventHolder) + data := util.NewData() + events = append(events, data) } return events } @@ -56,7 +49,7 @@ func TestPublisherModes(t *testing.T) { wg := sync.WaitGroup{} - pubChan := make(chan []*input.Data, len(test.order)+1) + pubChan := make(chan []*util.Data, len(test.order)+1) collector := &collectLogger{&wg, nil} client := pubtest.NewChanClient(0) @@ -64,7 +57,7 @@ func TestPublisherModes(t *testing.T) { pubtest.PublisherWithClient(client)) pub.Start() - var events [][]*input.Data + var events [][]*util.Data for i := range test.order { tmp := makeEvents(fmt.Sprintf("msg: %v", i), 1) wg.Add(1) diff --git a/filebeat/publisher/sync.go b/filebeat/publisher/sync.go index 2fbe2e2b3ab5..34781c76340f 100644 --- a/filebeat/publisher/sync.go +++ b/filebeat/publisher/sync.go @@ -3,7 +3,7 @@ package publisher import ( "sync" - "github.com/elastic/beats/filebeat/input" + "github.com/elastic/beats/filebeat/util" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/publisher" ) @@ -11,7 +11,7 @@ import ( type syncLogPublisher struct { pub publisher.Publisher client publisher.Client - in chan []*input.Data + in chan []*util.Data out SuccessLogger done chan struct{} @@ -19,7 +19,7 @@ type syncLogPublisher struct { } func newSyncLogPublisher( - in chan []*input.Data, + in chan []*util.Data, out SuccessLogger, pub publisher.Publisher, ) *syncLogPublisher { @@ -51,7 +51,7 @@ func (p *syncLogPublisher) Start() { } func (p *syncLogPublisher) Publish() error { - var events []*input.Data + var events []*util.Data select { case <-p.done: return sigPublisherStop diff --git a/filebeat/registrar/registrar.go b/filebeat/registrar/registrar.go index e08a741d3680..ed1a2dc395a9 100644 --- a/filebeat/registrar/registrar.go +++ b/filebeat/registrar/registrar.go @@ -7,17 +7,16 @@ import ( "path/filepath" "sync" - cfg "github.com/elastic/beats/filebeat/config" - "github.com/elastic/beats/filebeat/input" "github.com/elastic/beats/filebeat/input/file" "github.com/elastic/beats/filebeat/publisher" + "github.com/elastic/beats/filebeat/util" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/monitoring" "github.com/elastic/beats/libbeat/paths" ) type Registrar struct { - Channel chan []*input.Data + Channel chan []*util.Data out publisher.SuccessLogger done chan struct{} registryFile string // Path to the Registry File @@ -38,7 +37,7 @@ func New(registryFile string, out publisher.SuccessLogger) (*Registrar, error) { registryFile: registryFile, done: make(chan struct{}), states: file.NewStates(), - Channel: make(chan []*input.Data, 1), + Channel: make(chan []*util.Data, 1), out: out, wg: sync.WaitGroup{}, } @@ -153,7 +152,7 @@ func (r *Registrar) Run() { }() for { - var events []*input.Data + var events []*util.Data select { case <-r.done: @@ -183,17 +182,16 @@ func (r *Registrar) Run() { } // processEventStates gets the states from the events and writes them to the registrar state -func (r *Registrar) processEventStates(events []*input.Data) { +func (r *Registrar) processEventStates(events []*util.Data) { logp.Debug("registrar", "Processing %d events", len(events)) - // skip stdin - for _, event := range events { + for _, data := range events { - // skip stdin - if event.Metadata.InputType == cfg.StdinInputType { + // skip events without state + if !data.HasState() { continue } - r.states.Update(event.Metadata.State) + r.states.Update(data.GetState()) statesUpdate.Add(1) } } diff --git a/filebeat/spooler/spooler.go b/filebeat/spooler/spooler.go index 5279581292d9..a67d232efc55 100644 --- a/filebeat/spooler/spooler.go +++ b/filebeat/spooler/spooler.go @@ -5,7 +5,7 @@ import ( "time" cfg "github.com/elastic/beats/filebeat/config" - "github.com/elastic/beats/filebeat/input" + "github.com/elastic/beats/filebeat/util" "github.com/elastic/beats/libbeat/logp" ) @@ -16,16 +16,16 @@ const channelSize = 16 // Spooler aggregates the events and sends the aggregated data to the publisher. type Spooler struct { - Channel chan *input.Data // Channel is the input to the Spooler. + Channel chan *util.Data // Channel is the input to the Spooler. config spoolerConfig output Output // batch event output on flush - spool []*input.Data // Events being held by the Spooler. + spool []*util.Data // Events being held by the Spooler. wg sync.WaitGroup // WaitGroup used to control the shutdown. } // Output spooler sends event to through Send method type Output interface { - Send(events []*input.Data) bool + Send(events []*util.Data) bool } type spoolerConfig struct { @@ -40,13 +40,13 @@ func New( out Output, ) (*Spooler, error) { return &Spooler{ - Channel: make(chan *input.Data, channelSize), + Channel: make(chan *util.Data, channelSize), config: spoolerConfig{ idleTimeout: config.IdleTimeout, spoolSize: config.SpoolSize, }, output: out, - spool: make([]*input.Data, 0, config.SpoolSize), + spool: make([]*util.Data, 0, config.SpoolSize), }, nil } @@ -70,12 +70,12 @@ func (s *Spooler) run() { for { select { - case event, ok := <-s.Channel: + case data, ok := <-s.Channel: if !ok { return } - if event != nil { - flushed := s.queue(event) + if data != nil { + flushed := s.queue(data) if flushed { // Stop timer and drain channel. See https://golang.org/pkg/time/#Timer.Reset if !timer.Stop() { @@ -111,9 +111,9 @@ func (s *Spooler) Stop() { // queue queues a single event to be spooled. If the queue reaches spoolSize // while calling this method then all events in the queue will be flushed to // the publisher. -func (s *Spooler) queue(event *input.Data) bool { +func (s *Spooler) queue(data *util.Data) bool { flushed := false - s.spool = append(s.spool, event) + s.spool = append(s.spool, data) if len(s.spool) == cap(s.spool) { debugf("Flushing spooler because spooler full. Events flushed: %v", len(s.spool)) s.flush() @@ -131,7 +131,7 @@ func (s *Spooler) flush() int { } // copy buffer - tmpCopy := make([]*input.Data, count) + tmpCopy := make([]*util.Data, count) copy(tmpCopy, s.spool) // clear buffer diff --git a/filebeat/util/data.go b/filebeat/util/data.go new file mode 100644 index 000000000000..2105d93f2a9c --- /dev/null +++ b/filebeat/util/data.go @@ -0,0 +1,65 @@ +package util + +import ( + "github.com/elastic/beats/filebeat/input/file" + "github.com/elastic/beats/libbeat/common" +) + +type Data struct { + Event common.MapStr + state file.State + Meta Meta +} + +type Meta struct { + Pipeline string + Fileset string + Module string +} + +func NewData() *Data { + return &Data{} +} + +// SetState sets the state +func (d *Data) SetState(state file.State) { + d.state = state +} + +// GetState returns the current state +func (d *Data) GetState() file.State { + return d.state +} + +// HasState returns true if the data object contains state data +func (d *Data) HasState() bool { + return d.state != file.State{} +} + +// GetEvent returns the event in the data object +// In case meta data contains module and fileset data, the event is enriched with it +func (d *Data) GetEvent() common.MapStr { + if d.Meta.Fileset != "" && d.Meta.Module != "" { + d.Event["fileset"] = common.MapStr{ + "name": d.Meta.Fileset, + "module": d.Meta.Module, + } + } + return d.Event +} + +// GetMetadata creates a common.MapStr containing the metadata to +// be associated with the event. +func (d *Data) GetMetadata() common.MapStr { + if d.Meta.Pipeline != "" { + return common.MapStr{ + "pipeline": d.Meta.Pipeline, + } + } + return nil +} + +// HasEvent returns true if the data object contains event data +func (d *Data) HasEvent() bool { + return d.Event != nil +} diff --git a/filebeat/util/data_test.go b/filebeat/util/data_test.go new file mode 100644 index 000000000000..7e4cad2e536f --- /dev/null +++ b/filebeat/util/data_test.go @@ -0,0 +1,39 @@ +package util + +import ( + "testing" + + "github.com/elastic/beats/filebeat/input/file" + "github.com/elastic/beats/libbeat/common" + "github.com/stretchr/testify/assert" +) + +func TestNewData(t *testing.T) { + + data := NewData() + + assert.False(t, data.HasEvent()) + assert.False(t, data.HasState()) + + data.SetState(file.State{Source: "-"}) + + assert.False(t, data.HasEvent()) + assert.True(t, data.HasState()) + + data.Event = common.MapStr{} + + assert.True(t, data.HasEvent()) + assert.True(t, data.HasState()) +} + +func TestGetEvent(t *testing.T) { + + data := NewData() + data.Meta.Module = "testmodule" + data.Meta.Fileset = "testfileset" + data.Event = common.MapStr{"hello": "world"} + + out := common.MapStr{"fileset": common.MapStr{"module": "testmodule", "name": "testfileset"}, "hello": "world"} + + assert.Equal(t, out, data.GetEvent()) +}