Skip to content

Commit

Permalink
Register the add_formatted_index as a usable processor. (#33800)
Browse files Browse the repository at this point in the history
* Register the add_formatted_index as a usage processor for the processors list.

* Re-add removed file.

* Add changelog entry.

* Add missing return in String().

* Change implementation to use a boolean.

(cherry picked from commit 4c4eecf)
  • Loading branch information
blakerouse authored and mergify[bot] committed Nov 28, 2022
1 parent 26860af commit ee8a9bb
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 2 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]

- Beats will now attempt to recover if a lockfile has not been removed {pull}[33169]
- Add `http.pprof` config options for enabling block and mutex profiling. {issue}33572[33572] {pull}33576[33576]
- Added append Processor which will append concrete values or values from a field to target. {issue}29934[29934] {pull}33364[33364]
- Add `add_formatted_index` processor that allows the resulting index for an event to be changed based on content from the event. {pull}33800[33800]

*Auditbeat*

Expand Down
1 change: 1 addition & 0 deletions libbeat/cmd/instance/imports_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
_ "github.com/elastic/beats/v7/libbeat/monitoring/report/elasticsearch" // Register default monitoring reporting
_ "github.com/elastic/beats/v7/libbeat/processors/actions" // Register default processors.
_ "github.com/elastic/beats/v7/libbeat/processors/add_cloud_metadata"
_ "github.com/elastic/beats/v7/libbeat/processors/add_formatted_index"
_ "github.com/elastic/beats/v7/libbeat/processors/add_host_metadata"
_ "github.com/elastic/beats/v7/libbeat/processors/add_id"
_ "github.com/elastic/beats/v7/libbeat/processors/add_locale"
Expand Down
15 changes: 15 additions & 0 deletions libbeat/common/fmtstr/formattimestamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,21 @@ func (fs *TimestampFormatString) Run(timestamp time.Time) (string, error) {
return fs.eventFormatString.Run(placeholderEvent)
}

// RunEvent executes the format string returning a new expanded string or an error
// if execution or event field expansion fails.
func (fs *TimestampFormatString) RunEvent(event *beat.Event) (string, error) {
return fs.eventFormatString.Run(event)
}

func (fs *TimestampFormatString) String() string {
return fs.eventFormatString.expression
}

// Unpack tries to initialize the TimestampFormatString from provided value
// (which must be a string). Unpack method satisfies go-ucfg.Unpacker interface
// required by config.C, in order to use TimestampFormatString with
// `common.(*Config).Unpack()`.
func (fs *TimestampFormatString) Unpack(v interface{}) error {
fs.eventFormatString = &EventFormatString{}
return fs.eventFormatString.Unpack(v)
}
27 changes: 25 additions & 2 deletions libbeat/processors/add_formatted_index/add_formatted_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,48 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/beat/events"
"github.com/elastic/beats/v7/libbeat/common/fmtstr"
"github.com/elastic/beats/v7/libbeat/processors"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/mapstr"
)

func init() {
processors.RegisterPlugin("add_formatted_index", NewC)
}

// AddFormattedIndex is a Processor to set an event's "raw_index" metadata field
// with a given TimestampFormatString. The elasticsearch output interprets
// that field as specifying the (raw string) index the event should be sent to;
// in other outputs it is just included in the metadata.
type AddFormattedIndex struct {
formatString *fmtstr.TimestampFormatString
fullEvent bool
}

// New returns a new AddFormattedIndex processor.
func New(formatString *fmtstr.TimestampFormatString) *AddFormattedIndex {
return &AddFormattedIndex{formatString}
return &AddFormattedIndex{formatString, false}
}

// NewC constructs a new AddFormattedIndex processor from configuration
func NewC(cfg *conf.C) (processors.Processor, error) {
var c config
if err := cfg.Unpack(&c); err != nil {
return nil, err
}

return &AddFormattedIndex{c.Index, true}, nil
}

// Run runs the processor.
func (p *AddFormattedIndex) Run(event *beat.Event) (*beat.Event, error) {
index, err := p.formatString.Run(event.Timestamp)
var index string
var err error
if p.fullEvent {
index, err = p.formatString.RunEvent(event)
} else {
index, err = p.formatString.Run(event.Timestamp)
}
if err != nil {
return nil, err
}
Expand Down
39 changes: 39 additions & 0 deletions libbeat/processors/add_formatted_index/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package add_formatted_index

import (
"errors"

"github.com/elastic/beats/v7/libbeat/common/fmtstr"
)

// configuration for AddFormattedIndex processor.
type config struct {
Index *fmtstr.TimestampFormatString `config:"index"` // Index formatted string value
}

// Validate ensures that the configuration is valid.
func (c *config) Validate() error {
// Validate type of ID generator
if c.Index == nil {
return errors.New("index field is required")
}

return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[[add-locale]]
=== Add formatted index

++++
<titleabbrev>add_formatted_index</titleabbrev>
++++

The `add_formatted_index` processor allows the destination index for the event to
be changed based on a formatted string that can use values from fields defined on
the event.

For example, this configuration uses a custom field, fields.log_type, to set the index:

[source,yaml]
-------------------------------------------------------------------------------
processors:
- add_formatted_index:
index: "%{[fields.log_type]}-%{[agent.version]}-%{+yyyy.MM.dd}"
-------------------------------------------------------------------------------

With this configuration, all events with log_type: normal are sent to an index named
normal-7.10.2-2022-11-18, and all events with log_type: critical are sent to an index
named critical-7.10.2-2022-11-18.

0 comments on commit ee8a9bb

Please sign in to comment.