Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add complete k8s metadata through composable provider #27691

Merged
merged 17 commits into from
Sep 20, 2021
Merged
2 changes: 1 addition & 1 deletion libbeat/cmd/instance/locker.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func newLocker(b *Beat) *locker {
}
}

// lock attemps to acquire a lock on the data path for the currently-running
// lock attempts to acquire a lock on the data path for the currently-running
// Beat instance. If another Beats instance already has a lock on the same data path
// an ErrAlreadyLocked error is returned.
func (l *locker) lock() error {
Expand Down
3 changes: 2 additions & 1 deletion x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,5 @@
- Add new --enroll-delay option for install and enroll commands. {pull}27118[27118]
- Add link to troubleshooting guide on fatal exits. {issue}26367[26367] {pull}27236[27236]
- Agent now adapts the beats queue size based on output settings. {issue}26638[26638] {pull}27429[27429]
- Support ephemeral containers in Kubernetes dynamic provider. {issue}#27020[#27020] {pull}27707[27707]
- Support ephemeral containers in Kubernetes dynamic provider. {issue}27020[#27020] {pull}27707[27707]
- Add complete k8s metadata through composable provider. {pull}27691[27691]
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package kubernetes
import (
"time"

"github.com/elastic/beats/v7/libbeat/common/kubernetes/metadata"
"github.com/elastic/beats/v7/libbeat/logp"
)

Expand All @@ -25,6 +26,16 @@ type Config struct {

// Needed when resource is a Pod or Node
Node string `config:"node"`

AddResourceMetadata *metadata.AddResourceMetadataConfig `config:"add_resource_metadata"`
IncludeLabels []string `config:"include_labels"`
ExcludeLabels []string `config:"exclude_labels"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is IncludeLabels and ExcludeLabels used? I was not able to find them in the diff.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are passed and used deeper in meta Generators, which is already existent codebase.


LabelsDedot bool `config:"labels.dedot"`
AnnotationsDedot bool `config:"annotations.dedot"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this just be the default? Do we need this configurable now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about the history of these settings. Maybe some users had requests for these? @jsoriano @exekias do you think we could remove those settings and have dedoting as an always-on feature?


// Undocumented settings, to be deprecated in favor of `drop_fields` processor:
IncludeCreatorMetadata bool `config:"include_creator_metadata"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If undocumented and deprecated, why add it? Being this is all new to Elastic Agent, it could be the time to remove it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right this one is not exposed so it should be removed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this touches code that is used by beats too. I will remove it in follow up PR target 8.0

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR: #28006

}

// Resources config section for resources' config blocks
Expand All @@ -44,6 +55,9 @@ func (c *Config) InitDefaults() {
c.CleanupTimeout = 60 * time.Second
c.SyncPeriod = 10 * time.Minute
c.Scope = "node"
c.IncludeCreatorMetadata = true
c.LabelsDedot = true
c.AnnotationsDedot = true
}

// Validate ensures correctness of config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,25 +46,26 @@ func DynamicProviderBuilder(logger *logger.Logger, c *config.Config) (composable
if err != nil {
return nil, errors.New(err, "failed to unpack configuration")
}

return &dynamicProvider{logger, &cfg}, nil
}

// Run runs the kubernetes context provider.
func (p *dynamicProvider) Run(comm composable.DynamicProviderComm) error {
if p.config.Resources.Pod.Enabled {
err := p.watchResource(comm, "pod", p.config)
err := p.watchResource(comm, "pod")
if err != nil {
return err
}
}
if p.config.Resources.Node.Enabled {
err := p.watchResource(comm, "node", p.config)
err := p.watchResource(comm, "node")
if err != nil {
return err
}
}
if p.config.Resources.Service.Enabled {
err := p.watchResource(comm, "service", p.config)
err := p.watchResource(comm, "service")
if err != nil {
return err
}
Expand All @@ -76,9 +77,8 @@ func (p *dynamicProvider) Run(comm composable.DynamicProviderComm) error {
// and starts watching for such resource's events.
func (p *dynamicProvider) watchResource(
comm composable.DynamicProviderComm,
resourceType string,
config *Config) error {
client, err := kubernetes.GetKubernetesClient(config.KubeConfig)
resourceType string) error {
client, err := kubernetes.GetKubernetesClient(p.config.KubeConfig)
if err != nil {
// info only; return nil (do nothing)
p.logger.Debugf("Kubernetes provider for resource %s skipped, unable to connect: %s", resourceType, err)
Expand All @@ -93,24 +93,24 @@ func (p *dynamicProvider) watchResource(
p.logger.Debugf(
"Initializing Kubernetes watcher for resource %s using node: %v",
resourceType,
config.Node)
p.config.Node)
nd := &kubernetes.DiscoverKubernetesNodeParams{
ConfigHost: config.Node,
ConfigHost: p.config.Node,
Client: client,
IsInCluster: kubernetes.IsInCluster(config.KubeConfig),
IsInCluster: kubernetes.IsInCluster(p.config.KubeConfig),
HostUtils: &kubernetes.DefaultDiscoveryUtils{},
}
config.Node, err = kubernetes.DiscoverKubernetesNode(p.logger, nd)
p.config.Node, err = kubernetes.DiscoverKubernetesNode(p.logger, nd)
if err != nil {
p.logger.Debugf("Kubernetes provider skipped, unable to discover node: %w", err)
return nil
}

} else {
config.Node = ""
p.config.Node = ""
}

watcher, err := p.newWatcher(resourceType, comm, client, config)
watcher, err := p.newWatcher(resourceType, comm, client)
if err != nil {
return errors.New(err, "couldn't create kubernetes watcher for resource %s", resourceType)
}
Expand All @@ -126,23 +126,22 @@ func (p *dynamicProvider) watchResource(
func (p *dynamicProvider) newWatcher(
resourceType string,
comm composable.DynamicProviderComm,
client k8s.Interface,
config *Config) (kubernetes.Watcher, error) {
client k8s.Interface) (kubernetes.Watcher, error) {
switch resourceType {
case "pod":
watcher, err := NewPodWatcher(comm, config, p.logger, client, p.config.Scope)
watcher, err := NewPodWatcher(comm, p.config, p.logger, client, p.config.Scope)
if err != nil {
return nil, err
}
return watcher, nil
case "node":
watcher, err := NewNodeWatcher(comm, config, p.logger, client, p.config.Scope)
watcher, err := NewNodeWatcher(comm, p.config, p.logger, client, p.config.Scope)
if err != nil {
return nil, err
}
return watcher, nil
case "service":
watcher, err := NewServiceWatcher(comm, config, p.logger, client, p.config.Scope)
watcher, err := NewServiceWatcher(comm, p.config, p.logger, client, p.config.Scope)
if err != nil {
return nil, err
}
Expand Down
61 changes: 37 additions & 24 deletions x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/kubernetes"
"github.com/elastic/beats/v7/libbeat/common/kubernetes/metadata"
"github.com/elastic/beats/v7/libbeat/common/safemapstr"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
Expand All @@ -25,6 +26,7 @@ type node struct {
comm composable.DynamicProviderComm
scope string
config *Config
metagen metadata.MetaGen
}

type nodeData struct {
Expand All @@ -49,13 +51,25 @@ func NewNodeWatcher(
if err != nil {
return nil, errors.New(err, "couldn't create kubernetes watcher")
}
watcher.AddEventHandler(&node{logger, cfg.CleanupTimeout, comm, scope, cfg})

rawConfig, err := common.NewConfigFrom(cfg)
if err != nil {
return nil, errors.New(err, "failed to unpack configuration")
}
metaGen := metadata.NewNodeMetadataGenerator(rawConfig, watcher.Store(), client)
watcher.AddEventHandler(&node{
logger,
cfg.CleanupTimeout,
comm,
scope,
cfg,
metaGen})

return watcher, nil
}

func (n *node) emitRunning(node *kubernetes.Node) {
data := generateNodeData(node, n.config)
data := generateNodeData(node, n.config, n.metagen)
if data == nil {
return
}
Expand Down Expand Up @@ -165,7 +179,7 @@ func isNodeReady(node *kubernetes.Node) bool {
return false
}

func generateNodeData(node *kubernetes.Node, cfg *Config) *nodeData {
func generateNodeData(node *kubernetes.Node, cfg *Config, kubeMetaGen metadata.MetaGen) *nodeData {
host := getAddress(node)

// If a node doesn't have an IP then dont monitor it
Expand All @@ -178,41 +192,40 @@ func generateNodeData(node *kubernetes.Node, cfg *Config) *nodeData {
return nil
}

//TODO: add metadata here too ie -> meta := n.metagen.Generate(node)
meta := kubeMetaGen.Generate(node)
kubemetaMap, err := meta.GetValue("kubernetes")
if err != nil {
return &nodeData{}
}

// Pass annotations to all events so that it can be used in templating and by annotation builders.
annotations := common.MapStr{}
for k, v := range node.GetObjectMeta().GetAnnotations() {
safemapstr.Put(annotations, k, v)
}

labels := common.MapStr{}
for k, v := range node.GetObjectMeta().GetLabels() {
// TODO: add dedoting option
safemapstr.Put(labels, k, v)
}
// k8sMapping includes only the metadata that fall under kubernetes.*
// and these are available as dynamic vars through the provider
k8sMapping := map[string]interface{}(kubemetaMap.(common.MapStr).Clone())

mapping := map[string]interface{}{
"node": map[string]interface{}{
"uid": string(node.GetUID()),
"name": node.GetName(),
"labels": labels,
"annotations": annotations,
"ip": host,
},
}
// add annotations to be discoverable by templates
k8sMapping["annotations"] = annotations

processors := []map[string]interface{}{
{
processors := []map[string]interface{}{}
// meta map includes metadata that go under kubernetes.*
// but also other ECS fields like orchestrator.*
for field, metaMap := range meta {
processor := map[string]interface{}{
"add_fields": map[string]interface{}{
"fields": mapping,
"target": "kubernetes",
"fields": metaMap,
"target": field,
},
},
}
processors = append(processors, processor)
}
return &nodeData{
node: node,
mapping: mapping,
mapping: k8sMapping,
processors: processors,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package kubernetes
import (
"testing"

"github.com/elastic/beats/v7/libbeat/common/kubernetes/metadata"

"github.com/elastic/beats/v7/libbeat/common"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -41,32 +43,96 @@ func TestGenerateNodeData(t *testing.T) {
},
}

data := generateNodeData(node, &Config{})
data := generateNodeData(node, &Config{}, &nodeMeta{})

mapping := map[string]interface{}{
"node": map[string]interface{}{
"node": common.MapStr{
"uid": string(node.GetUID()),
"name": node.GetName(),
"labels": common.MapStr{
"foo": "bar",
},
"annotations": common.MapStr{
"baz": "ban",
},
"ip": "node1",
"ip": "node1",
},
"annotations": common.MapStr{
"baz": "ban",
},
"labels": common.MapStr{
"foo": "bar",
},
}

processors := []map[string]interface{}{
{
"add_fields": map[string]interface{}{
"fields": mapping,
"target": "kubernetes",
},
processors := map[string]interface{}{
"orchestrator": common.MapStr{
"cluster": common.MapStr{
"name": "devcluster",
"url": "8.8.8.8:9090"},
}, "kubernetes": common.MapStr{
"labels": common.MapStr{"foo": "bar"},
"annotations": common.MapStr{"baz": "ban"},
"node": common.MapStr{
"ip": "node1",
"name": "testnode",
"uid": "005f3b90-4b9d-12f8-acf0-31020a840133"},
},
}

assert.Equal(t, node, data.node)
assert.Equal(t, mapping, data.mapping)
assert.Equal(t, processors, data.processors)
for _, v := range data.processors {
k := v["add_fields"].(map[string]interface{})
target := k["target"].(string)
fields := k["fields"]
assert.Equal(t, processors[target], fields)
}
}

type nodeMeta struct{}

// Generate generates node metadata from a resource object
// Metadata map is in the following form:
// {
// "kubernetes": {},
// "some.ecs.field": "asdf"
// }
// All Kubernetes fields that need to be stored under kubernetes. prefix are populated by
// GenerateK8s method while fields that are part of ECS are generated by GenerateECS method
func (n *nodeMeta) Generate(obj kubernetes.Resource, opts ...metadata.FieldOptions) common.MapStr {
ecsFields := n.GenerateECS(obj)
meta := common.MapStr{
"kubernetes": n.GenerateK8s(obj, opts...),
}
meta.DeepUpdate(ecsFields)
return meta
}

// GenerateECS generates node ECS metadata from a resource object
func (n *nodeMeta) GenerateECS(obj kubernetes.Resource) common.MapStr {
return common.MapStr{
"orchestrator": common.MapStr{
"cluster": common.MapStr{
"name": "devcluster",
"url": "8.8.8.8:9090",
},
},
}
}

// GenerateK8s generates node metadata from a resource object
func (n *nodeMeta) GenerateK8s(obj kubernetes.Resource, opts ...metadata.FieldOptions) common.MapStr {
k8sNode := obj.(*kubernetes.Node)
return common.MapStr{
"node": common.MapStr{
"uid": string(k8sNode.GetUID()),
"name": k8sNode.GetName(),
"ip": "node1",
},
"labels": common.MapStr{
"foo": "bar",
},
"annotations": common.MapStr{
"baz": "ban",
},
}
}

// GenerateFromName generates node metadata from a node name
func (n *nodeMeta) GenerateFromName(name string, opts ...metadata.FieldOptions) common.MapStr {
return nil
}
Loading