Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce add_labels and add_tags processors #9973

Merged
merged 8 commits into from
Jan 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Update field definitions for `http` to ECS Beta 2 {pull}9645[9645]
- Add `agent.id` and `agent.ephemeral_id` fields to all beats. {pull}9404[9404]
- Add `name` config option to `add_host_metadata` processor. {pull}9943[9943]
- Add `add_labels` and `add_tags` processors. {pull}9973[9973]

*Auditbeat*

Expand Down
91 changes: 91 additions & 0 deletions libbeat/processors/actions/add_labels.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package actions

import (
"encoding/json"
"fmt"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/processors"
)

type addLabels struct {
labels common.MapStr
shared bool
}

// LabelsKey is the default target key for the add_labels processor.
const LabelsKey = "labels"

func init() {
processors.RegisterPlugin("add_labels",
configChecked(createAddLabels,
requireFields("labels"),
allowedFields("labels", "when")))
}

func createAddLabels(c *common.Config) (processors.Processor, error) {
config := struct {
Labels common.MapStr `config:"labels" validate:"required"`
Target *string `config:"target"`
}{}
err := c.Unpack(&config)
if err != nil {
return nil, fmt.Errorf("fail to unpack the add_fields configuration: %s", err)
}

var target string
if config.Target == nil {
target = LabelsKey
} else {
target = *config.Target
}

labels := config.Labels
if target != "" {
labels = common.MapStr{
target: labels,
}
}

return NewAddLabels(labels, true), nil
}

// NewAddLabels creates a new processor adding the given object to events. Set
// `shared` true if there is the chance of labels being changed/modified by
// subsequent processors.
func NewAddLabels(labels common.MapStr, shared bool) processors.Processor {
return &addLabels{labels: labels, shared: shared}
}

func (af *addLabels) Run(event *beat.Event) (*beat.Event, error) {
labels := af.labels
if af.shared {
labels = labels.Clone()
}

event.Fields.DeepUpdate(labels)
return event, nil
}

func (af *addLabels) String() string {
s, _ := json.Marshal(af.labels)
return fmt.Sprintf("add_labels=%s", s)
}
128 changes: 128 additions & 0 deletions libbeat/processors/actions/add_labels_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package actions

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/processors"
)

func TestAddLabels(t *testing.T) {
multi := func(strs ...string) []string { return strs }
single := func(str string) []string { return multi(str) }

cases := map[string]struct {
event common.MapStr
want common.MapStr
cfg []string
}{
"add label": {
event: common.MapStr{},
want: common.MapStr{
"labels": common.MapStr{"label": "test"},
},
cfg: single(`{labels: {label: test}}`),
},
"custom target": {
event: common.MapStr{},
want: common.MapStr{
"my": common.MapStr{"label": "test"},
},
cfg: single(`{target: my, labels: {label: test}}`),
},
"overwrite existing label": {
event: common.MapStr{
"labels": common.MapStr{"label": "old"},
},
want: common.MapStr{
"labels": common.MapStr{"label": "test"},
},
cfg: single(`{labels: {label: test}}`),
},
"merge with existing labels": {
event: common.MapStr{
"labels": common.MapStr{"existing": "a"},
},
want: common.MapStr{
"labels": common.MapStr{"existing": "a", "label": "test"},
},
cfg: single(`{labels: {label: test}}`),
},
"combine 2 processors": {
event: common.MapStr{},
want: common.MapStr{
"labels": common.MapStr{
"l1": "a",
"l2": "b",
},
},
cfg: multi(
`{labels: {l1: a}}`,
`{labels: {l2: b}}`,
),
},
"different targets": {
event: common.MapStr{},
want: common.MapStr{
"a": common.MapStr{"l1": "a"},
"b": common.MapStr{"l2": "b"},
},
cfg: multi(
`{target: a, labels: {l1: a}}`,
`{target: b, labels: {l2: b}}`,
),
},
}

for name, test := range cases {
test := test
t.Run(name, func(t *testing.T) {
processors := make([]processors.Processor, len(test.cfg))
for i := range test.cfg {
config, err := common.NewConfigWithYAML([]byte(test.cfg[i]), "test")
if err != nil {
t.Fatalf("Failed to create config(%v): %+v", i, err)
}

processors[i], err = createAddLabels(config)
if err != nil {
t.Fatalf("Failed to create add_tags processor(%v): %+v", i, err)
}
}

current := &beat.Event{Fields: test.event.Clone()}
for i, processor := range processors {
var err error
current, err = processor.Run(current)
if err != nil {
t.Fatalf("Unexpected error from add_tags processor(%v): %+v", i, err)
}
if current == nil {
t.Fatalf("Event dropped(%v)", i)
}
}

assert.Equal(t, test.want, current.Fields)
})
}
}
81 changes: 81 additions & 0 deletions libbeat/processors/actions/add_tags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package actions

import (
"fmt"
"strings"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/processors"
)

type addTags struct {
tags []string
target string
}

func init() {
processors.RegisterPlugin("add_tags",
configChecked(createAddTags,
requireFields("tags"),
allowedFields("tags", "when")))
}

func createAddTags(c *common.Config) (processors.Processor, error) {
config := struct {
Tags []string `config:"tags" validate:"required"`
Target string `config:"target"`
urso marked this conversation as resolved.
Show resolved Hide resolved
}{}

err := c.Unpack(&config)
if err != nil {
return nil, fmt.Errorf("fail to unpack the add_tags configuration: %s", err)
}

return NewAddTags(config.Target, config.Tags), nil
}

// NewAddTags creates a new processor for adding tags to a field.
// If the target field already contains tags, then the new tags will be
// appended to the existing list of tags.
func NewAddTags(target string, tags []string) processors.Processor {
urso marked this conversation as resolved.
Show resolved Hide resolved
if target == "" {
target = common.TagsKey
}

// make sure capacity == length such that different processors adding more tags
// do not change/overwrite each other on append
if cap(tags) != len(tags) {
tmp := make([]string, len(tags), len(tags))
copy(tmp, tags)
tags = tmp
}

return &addTags{tags: tags, target: target}
}

func (at *addTags) Run(event *beat.Event) (*beat.Event, error) {
common.AddTagsWithKey(event.Fields, at.target, at.tags)
return event, nil
}

func (at *addTags) String() string {
return fmt.Sprintf("add_tags=%v", strings.Join(at.tags, ","))
}
Loading