Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #14875 to 7.x: Refactor metadata generator to support adding metadata across resources #15528

Merged
merged 1 commit into from
Jan 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Libbeat: Do not overwrite agent.*, ecs.version, and host.name. {pull}14407[14407]
- Libbeat: Cleanup the x-pack licenser code to use the new license endpoint and the new format. {pull}15091[15091]
- Users can now specify `monitoring.cloud.*` to override `monitoring.elasticsearch.*` settings. {issue}14399[14399] {pull}15254[15254]
- Refactor metadata generator to support adding metadata across resources {pull}14875[14875]
- Update to ECS 1.4.0. {pull}14844[14844]

*Auditbeat*
Expand Down
4 changes: 4 additions & 0 deletions libbeat/autodiscover/providers/kubernetes/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"fmt"
"time"

"github.com/elastic/beats/libbeat/common/kubernetes/metadata"

"github.com/elastic/beats/libbeat/autodiscover/template"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/cfgwarn"
Expand All @@ -48,6 +50,8 @@ type Config struct {
Builders []*common.Config `config:"builders"`
Appenders []*common.Config `config:"appenders"`
Templates template.MapperSettings `config:"templates"`

AddResourceMetadata *metadata.AddResourceMetadataConfig `config:"add_resource_metadata"`
}

func defaultConfig() *Config {
Expand Down
20 changes: 6 additions & 14 deletions libbeat/autodiscover/providers/kubernetes/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,30 +29,26 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/bus"
"github.com/elastic/beats/libbeat/common/kubernetes"
"github.com/elastic/beats/libbeat/common/kubernetes/metadata"
"github.com/elastic/beats/libbeat/common/safemapstr"
"github.com/elastic/beats/libbeat/logp"
)

type node struct {
uuid uuid.UUID
config *Config
metagen kubernetes.MetaGenerator
metagen metadata.MetaGen
logger *logp.Logger
publish func(bus.Event)
watcher kubernetes.Watcher
}

// NewNodeEventer creates an eventer that can discover and process node objects
func NewNodeEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, publish func(event bus.Event)) (Eventer, error) {
metagen, err := kubernetes.NewMetaGenerator(cfg)
if err != nil {
return nil, err
}

logger := logp.NewLogger("autodiscover.node")

config := defaultConfig()
err = cfg.Unpack(&config)
err := cfg.Unpack(&config)
if err != nil {
return nil, err
}
Expand All @@ -70,7 +66,7 @@ func NewNodeEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pu
watcher, err := kubernetes.NewWatcher(client, &kubernetes.Node{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Node: config.Node,
})
}, nil)

if err != nil {
return nil, fmt.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Node{}, err)
Expand All @@ -80,7 +76,7 @@ func NewNodeEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pu
config: config,
uuid: uuid,
publish: publish,
metagen: metagen,
metagen: metadata.NewNodeMetadataGenerator(cfg, watcher.Store()),
logger: logger,
watcher: watcher,
}
Expand Down Expand Up @@ -172,11 +168,7 @@ func (n *node) emit(node *kubernetes.Node, flag string) {
}

eventID := fmt.Sprint(node.GetObjectMeta().GetUID())
meta := n.metagen.ResourceMetadata(node)

// TODO: Refactor metagen to make sure that this is seamless
meta.Put("node.name", node.Name)
meta.Put("node.uid", string(node.GetObjectMeta().GetUID()))
meta := n.metagen.Generate(node)

kubemeta := meta.Clone()
// Pass annotations to all events so that it can be used in templating and by annotation builders.
Expand Down
18 changes: 12 additions & 6 deletions libbeat/autodiscover/providers/kubernetes/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"testing"
"time"

"github.com/elastic/beats/libbeat/common/kubernetes/metadata"

"github.com/gofrs/uuid"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -114,6 +116,11 @@ func TestEmitEvent_Node(t *testing.T) {
nodeIP := "192.168.0.1"
uid := "005f3b90-4b9d-12f8-acf0-31020a840133"
UUID, err := uuid.NewV4()

typeMeta := metav1.TypeMeta{
Kind: "Node",
APIVersion: "v1",
}
if err != nil {
t.Fatal(err)
}
Expand All @@ -134,6 +141,7 @@ func TestEmitEvent_Node(t *testing.T) {
Labels: map[string]string{},
Annotations: map[string]string{},
},
TypeMeta: typeMeta,
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{
{
Expand Down Expand Up @@ -180,7 +188,8 @@ func TestEmitEvent_Node(t *testing.T) {
Labels: map[string]string{},
Annotations: map[string]string{},
},
Status: v1.NodeStatus{},
TypeMeta: typeMeta,
Status: v1.NodeStatus{},
},
Expected: nil,
},
Expand All @@ -194,6 +203,7 @@ func TestEmitEvent_Node(t *testing.T) {
Labels: map[string]string{},
Annotations: map[string]string{},
},
TypeMeta: typeMeta,
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{},
Conditions: []v1.NodeCondition{
Expand Down Expand Up @@ -236,11 +246,7 @@ func TestEmitEvent_Node(t *testing.T) {
t.Fatal(err)
}

metaGen, err := kubernetes.NewMetaGenerator(common.NewConfig())
if err != nil {
t.Fatal(err)
}

metaGen := metadata.NewNodeMetadataGenerator(common.NewConfig(), nil)
p := &Provider{
config: defaultConfig(),
bus: bus.New("test"),
Expand Down
95 changes: 74 additions & 21 deletions libbeat/autodiscover/providers/kubernetes/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,34 +25,31 @@ import (
k8s "k8s.io/client-go/kubernetes"

"github.com/elastic/beats/libbeat/autodiscover/builder"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/bus"
"github.com/elastic/beats/libbeat/common/kubernetes"
"github.com/elastic/beats/libbeat/common/kubernetes/metadata"
"github.com/elastic/beats/libbeat/common/safemapstr"
"github.com/elastic/beats/libbeat/logp"
)

type pod struct {
uuid uuid.UUID
config *Config
metagen kubernetes.MetaGenerator
logger *logp.Logger
publish func(bus.Event)
watcher kubernetes.Watcher
uuid uuid.UUID
config *Config
metagen metadata.MetaGen
logger *logp.Logger
publish func(bus.Event)
watcher kubernetes.Watcher
nodeWatcher kubernetes.Watcher
namespaceWatcher kubernetes.Watcher
}

// NewPodEventer creates an eventer that can discover and process pod objects
func NewPodEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, publish func(event bus.Event)) (Eventer, error) {
metagen, err := kubernetes.NewMetaGenerator(cfg)
if err != nil {
return nil, err
}

logger := logp.NewLogger("autodiscover.pod")

config := defaultConfig()
err = cfg.Unpack(&config)
err := cfg.Unpack(&config)
if err != nil {
return nil, err
}
Expand All @@ -71,18 +68,52 @@ func NewPodEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pub
SyncTimeout: config.SyncPeriod,
Node: config.Node,
Namespace: config.Namespace,
})
}, nil)
if err != nil {
return nil, fmt.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Pod{}, err)
}

var nodeMeta, namespaceMeta metadata.MetaGen
var nodeWatcher, namespaceWatcher kubernetes.Watcher
metaConf := config.AddResourceMetadata
if metaConf != nil {
if metaConf.Node != nil && metaConf.Node.Enabled() {
options := kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Node: config.Node,
}
if config.Namespace != "" {
options.Namespace = config.Namespace
}
nodeWatcher, err = kubernetes.NewWatcher(client, &kubernetes.Node{}, options, nil)
if err != nil {
return nil, fmt.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Node{}, err)
}

nodeMeta = metadata.NewNodeMetadataGenerator(metaConf.Node, nodeWatcher.Store())
}

if metaConf.Namespace != nil && metaConf.Namespace.Enabled() {
namespaceWatcher, err = kubernetes.NewWatcher(client, &kubernetes.Namespace{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
}, nil)
if err != nil {
return nil, fmt.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Namespace{}, err)
}

namespaceMeta = metadata.NewNamespaceMetadataGenerator(metaConf.Namespace, namespaceWatcher.Store())
}
}

p := &pod{
config: config,
uuid: uuid,
publish: publish,
metagen: metagen,
logger: logger,
watcher: watcher,
config: config,
uuid: uuid,
publish: publish,
metagen: metadata.NewPodMetadataGenerator(cfg, watcher.Store(), nodeMeta, namespaceMeta),
logger: logger,
watcher: watcher,
nodeWatcher: nodeWatcher,
namespaceWatcher: namespaceWatcher,
}

watcher.AddEventHandler(p)
Expand Down Expand Up @@ -168,12 +199,33 @@ func (p *pod) GenerateHints(event bus.Event) bus.Event {

// Start starts the eventer
func (p *pod) Start() error {
if p.nodeWatcher != nil {
err := p.nodeWatcher.Start()
if err != nil {
return err
}
}

if p.namespaceWatcher != nil {
if err := p.namespaceWatcher.Start(); err != nil {
return err
}
}

return p.watcher.Start()
}

// Stop stops the eventer
func (p *pod) Stop() {
p.watcher.Stop()

if p.namespaceWatcher != nil {
p.namespaceWatcher.Stop()
}

if p.nodeWatcher != nil {
p.nodeWatcher.Stop()
}
}

func (p *pod) emit(pod *kubernetes.Pod, flag string) {
Expand Down Expand Up @@ -231,7 +283,8 @@ func (p *pod) emitEvents(pod *kubernetes.Pod, flag string, containers []kubernet
"image": c.Image,
"runtime": runtimes[c.Name],
}
meta := p.metagen.ContainerMetadata(pod, c.Name, c.Image)
meta := p.metagen.Generate(pod, metadata.WithFields("container.name", c.Name),
metadata.WithFields("container.image", c.Image))

// Information that can be used in discovering a workload
kubemeta := meta.Clone()
Expand Down
18 changes: 13 additions & 5 deletions libbeat/autodiscover/providers/kubernetes/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"testing"
"time"

"github.com/elastic/beats/libbeat/common/kubernetes/metadata"

"github.com/gofrs/uuid"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -175,6 +177,11 @@ func TestEmitEvent(t *testing.T) {
t.Fatal(err)
}

typeMeta := metav1.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
}

tests := []struct {
Message string
Flag string
Expand All @@ -192,6 +199,7 @@ func TestEmitEvent(t *testing.T) {
Labels: map[string]string{},
Annotations: map[string]string{},
},
TypeMeta: typeMeta,
Status: v1.PodStatus{
PodIP: podIP,
ContainerStatuses: []kubernetes.PodContainerStatus{
Expand Down Expand Up @@ -264,6 +272,7 @@ func TestEmitEvent(t *testing.T) {
Labels: map[string]string{},
Annotations: map[string]string{},
},
TypeMeta: typeMeta,
Status: v1.PodStatus{
ContainerStatuses: []kubernetes.PodContainerStatus{
{
Expand Down Expand Up @@ -295,6 +304,7 @@ func TestEmitEvent(t *testing.T) {
Labels: map[string]string{},
Annotations: map[string]string{},
},
TypeMeta: typeMeta,
Status: v1.PodStatus{
PodIP: podIP,
ContainerStatuses: []kubernetes.PodContainerStatus{
Expand Down Expand Up @@ -326,6 +336,7 @@ func TestEmitEvent(t *testing.T) {
Labels: map[string]string{},
Annotations: map[string]string{},
},
TypeMeta: typeMeta,
Status: v1.PodStatus{
ContainerStatuses: []kubernetes.PodContainerStatus{
{
Expand Down Expand Up @@ -393,6 +404,7 @@ func TestEmitEvent(t *testing.T) {
Labels: map[string]string{},
Annotations: map[string]string{},
},
TypeMeta: typeMeta,
Status: v1.PodStatus{
PodIP: podIP,
ContainerStatuses: []kubernetes.PodContainerStatus{
Expand Down Expand Up @@ -459,11 +471,7 @@ func TestEmitEvent(t *testing.T) {
t.Fatal(err)
}

metaGen, err := kubernetes.NewMetaGenerator(common.NewConfig())
if err != nil {
t.Fatal(err)
}

metaGen := metadata.NewPodMetadataGenerator(common.NewConfig(), nil, nil, nil)
p := &Provider{
config: defaultConfig(),
bus: bus.New("test"),
Expand Down
Loading