Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor add_kubernetes_metadata to support autodiscovery #5434

Merged
merged 1 commit into from
Oct 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di
- Add support for enabling TLS renegotiation. {issue}4386[4386]
- Add Azure VM support for add_cloud_metadata processor {pull}5355[5355]
- Add `output.file.permission` config option. {pull}4638[4638]
- Refactor add_kubernetes_metadata to support autodiscovery {pull}5434[5434]

*Auditbeat*

Expand Down
26 changes: 26 additions & 0 deletions libbeat/processors/add_kubernetes_metadata/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package add_kubernetes_metadata

import (
"encoding/json"

corev1 "github.com/ericchiang/k8s/api/v1"

"github.com/elastic/beats/libbeat/logp"
)

func GetPodMeta(pod *corev1.Pod) *Pod {
bytes, err := json.Marshal(pod)
if err != nil {
logp.Warn("Unable to marshal %v", pod.String())
return nil
}

po := &Pod{}
err = json.Unmarshal(bytes, po)
if err != nil {
logp.Warn("Unable to marshal %v", pod.String())
return nil
}

return po
}
250 changes: 250 additions & 0 deletions libbeat/processors/add_kubernetes_metadata/indexers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
package add_kubernetes_metadata

import (
"fmt"
"strings"
"sync"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)

const (
ContainerIndexerName = "container"
PodNameIndexerName = "pod_name"
)

// Indexer take known pods and generate all the metadata we need to enrich
// events in a efficient way. By preindexing the metadata in the way it will be
// checked when matching events
type Indexer interface {
// GetMetadata generates event metadata for the given pod, then returns the
// list of indexes to create, with the metadata to put on them
GetMetadata(pod *Pod) []MetadataIndex

// GetIndexes return the list of indexes the given pod belongs to. This function
// must return the same indexes than GetMetadata
GetIndexes(pod *Pod) []string
}

// MetadataIndex holds a pair of index -> metadata info
type MetadataIndex struct {
Index string
Data common.MapStr
}

type Indexers struct {
sync.RWMutex
indexers []Indexer
}

//GenMeta takes in pods to generate metadata for them
type GenMeta interface {
//GenerateMetaData generates metadata by taking in a pod as an input
GenerateMetaData(pod *Pod) common.MapStr
}

type IndexerConstructor func(config common.Config, genMeta GenMeta) (Indexer, error)

func NewIndexers(configs PluginConfig, metaGen *GenDefaultMeta) *Indexers {
indexers := []Indexer{}
for _, pluginConfigs := range configs {
for name, pluginConfig := range pluginConfigs {
indexFunc := Indexing.GetIndexer(name)
if indexFunc == nil {
logp.Warn("Unable to find indexing plugin %s", name)
continue
}

indexer, err := indexFunc(pluginConfig, metaGen)
if err != nil {
logp.Warn("Unable to initialize indexing plugin %s due to error %v", name, err)
}

indexers = append(indexers, indexer)
}
}

return &Indexers{
indexers: indexers,
}
}

// GetMetadata returns the composed metadata list from all registered indexers
func (i *Indexers) GetMetadata(pod *Pod) []MetadataIndex {
var metadata []MetadataIndex
i.RLock()
defer i.RUnlock()
for _, indexer := range i.indexers {
for _, m := range indexer.GetMetadata(pod) {
metadata = append(metadata, m)
}
}
return metadata
}

// GetIndexes returns the composed index list from all registered indexers
func (i *Indexers) GetIndexes(pod *Pod) []string {
var indexes []string
i.RLock()
defer i.RUnlock()
for _, indexer := range i.indexers {
for _, i := range indexer.GetIndexes(pod) {
indexes = append(indexes, i)
}
}
return indexes
}

func (i *Indexers) Empty() bool {
if len(i.indexers) == 0 {
return true
}

return false
}

type GenDefaultMeta struct {
annotations []string
labels []string
labelsExclude []string
}

func NewGenDefaultMeta(annotations, labels, labelsExclude []string) *GenDefaultMeta {
return &GenDefaultMeta{
annotations: annotations,
labels: labels,
labelsExclude: labelsExclude,
}
}

// GenerateMetaData generates default metadata for the given pod taking to account certain filters
func (g *GenDefaultMeta) GenerateMetaData(pod *Pod) common.MapStr {
labelMap := common.MapStr{}
annotationsMap := common.MapStr{}

if len(g.labels) == 0 {
for k, v := range pod.Metadata.Labels {
labelMap[k] = v
}
} else {
labelMap = generateMapSubset(pod.Metadata.Labels, g.labels)
}

// Exclude any labels that are present in the exclude_labels config
for _, label := range g.labelsExclude {
delete(labelMap, label)
}

annotationsMap = generateMapSubset(pod.Metadata.Annotations, g.annotations)

meta := common.MapStr{
"pod": common.MapStr{
"name": pod.Metadata.Name,
},
"namespace": pod.Metadata.Namespace,
}

if len(labelMap) != 0 {
meta["labels"] = labelMap
}

if len(annotationsMap) != 0 {
meta["annotations"] = annotationsMap
}

return meta
}

func generateMapSubset(input map[string]string, keys []string) common.MapStr {
output := common.MapStr{}
if input == nil {
return output
}

for _, key := range keys {
value, ok := input[key]
if ok {
output[key] = value
}
}

return output
}

// PodNameIndexer implements default indexer based on pod name
type PodNameIndexer struct {
genMeta GenMeta
}

func NewPodNameIndexer(_ common.Config, genMeta GenMeta) (Indexer, error) {
return &PodNameIndexer{genMeta: genMeta}, nil
}

func (p *PodNameIndexer) GetMetadata(pod *Pod) []MetadataIndex {
data := p.genMeta.GenerateMetaData(pod)
return []MetadataIndex{
{
Index: fmt.Sprintf("%s/%s", pod.Metadata.Namespace, pod.Metadata.Name),
Data: data,
},
}
}

func (p *PodNameIndexer) GetIndexes(pod *Pod) []string {
return []string{fmt.Sprintf("%s/%s", pod.Metadata.Namespace, pod.Metadata.Name)}
}

// ContainerIndexer indexes pods based on all their containers IDs
type ContainerIndexer struct {
genMeta GenMeta
}

func NewContainerIndexer(_ common.Config, genMeta GenMeta) (Indexer, error) {
return &ContainerIndexer{genMeta: genMeta}, nil
}

func (c *ContainerIndexer) GetMetadata(pod *Pod) []MetadataIndex {
commonMeta := c.genMeta.GenerateMetaData(pod)
var metadata []MetadataIndex
for _, status := range append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...) {
cID := containerID(status)
if cID == "" {
continue
}

containerMeta := commonMeta.Clone()
containerMeta["container"] = common.MapStr{
"name": status.Name,
}
metadata = append(metadata, MetadataIndex{
Index: cID,
Data: containerMeta,
})
}

return metadata
}

func (c *ContainerIndexer) GetIndexes(pod *Pod) []string {
var containers []string
for _, status := range append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...) {
cID := containerID(status)
if cID == "" {
continue
}
containers = append(containers, cID)
}
return containers
}

func containerID(status PodContainerStatus) string {
cID := status.ContainerID
if cID != "" {
parts := strings.Split(cID, "//")
if len(parts) == 2 {
return parts[1]
}
}
return ""
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,38 +124,6 @@ func TestContainerIndexer(t *testing.T) {
assert.Equal(t, expected.String(), indexers[1].Data.String())
}

func TestFieldMatcher(t *testing.T) {
testCfg := map[string]interface{}{
"lookup_fields": []string{},
}
fieldCfg, err := common.NewConfigFrom(testCfg)

assert.Nil(t, err)
matcher, err := NewFieldMatcher(*fieldCfg)
assert.NotNil(t, err)

testCfg["lookup_fields"] = "foo"
fieldCfg, _ = common.NewConfigFrom(testCfg)

matcher, err = NewFieldMatcher(*fieldCfg)
assert.NotNil(t, matcher)
assert.Nil(t, err)

input := common.MapStr{
"foo": "bar",
}

out := matcher.MetadataIndex(input)
assert.Equal(t, out, "bar")

nonMatchInput := common.MapStr{
"not": "match",
}

out = matcher.MetadataIndex(nonMatchInput)
assert.Equal(t, out, "")
}

func TestFilteredGenMeta(t *testing.T) {
var testConfig = common.NewConfig()

Expand Down Expand Up @@ -269,49 +237,3 @@ func TestFilteredGenMetaExclusion(t *testing.T) {
ok, _ = labelMap.HasKey("x")
assert.Equal(t, ok, false)
}

func TestFieldFormatMatcher(t *testing.T) {
testCfg := map[string]interface{}{}
fieldCfg, err := common.NewConfigFrom(testCfg)

assert.Nil(t, err)
matcher, err := NewFieldFormatMatcher(*fieldCfg)
assert.NotNil(t, err)

testCfg["format"] = `%{[namespace]}/%{[pod]}`
fieldCfg, _ = common.NewConfigFrom(testCfg)

matcher, err = NewFieldFormatMatcher(*fieldCfg)
assert.NotNil(t, matcher)
assert.Nil(t, err)

event := common.MapStr{
"namespace": "foo",
"pod": "bar",
}

out := matcher.MetadataIndex(event)
assert.Equal(t, "foo/bar", out)

event = common.MapStr{
"foo": "bar",
}
out = matcher.MetadataIndex(event)
assert.Empty(t, out)

testCfg["format"] = `%{[dimensions.namespace]}/%{[dimensions.pod]}`
fieldCfg, _ = common.NewConfigFrom(testCfg)
matcher, err = NewFieldFormatMatcher(*fieldCfg)
assert.NotNil(t, matcher)
assert.Nil(t, err)

event = common.MapStr{
"dimensions": common.MapStr{
"pod": "bar",
"namespace": "foo",
},
}

out = matcher.MetadataIndex(event)
assert.Equal(t, "foo/bar", out)
}
Loading