From 73b67bd54b2041459265ed4448708fd6ff01e1f6 Mon Sep 17 00:00:00 2001 From: chrismark Date: Wed, 1 Sep 2021 17:13:52 +0300 Subject: [PATCH 01/16] Add metagen for Pods in Agent Signed-off-by: chrismark --- .../composable/providers/kubernetes/config.go | 3 + .../providers/kubernetes/kubernetes.go | 11 +++- .../composable/providers/kubernetes/pod.go | 62 ++++++++++++++----- 3 files changed, 59 insertions(+), 17 deletions(-) diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go index d0538f433639..9084ccbc80ee 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go @@ -8,6 +8,7 @@ package kubernetes import ( + "github.com/elastic/beats/v7/libbeat/common/kubernetes/metadata" "time" "github.com/elastic/beats/v7/libbeat/logp" @@ -25,6 +26,8 @@ type Config struct { // Needed when resource is a Pod or Node Node string `config:"node"` + + AddResourceMetadata *metadata.AddResourceMetadataConfig `config:"add_resource_metadata"` } // Resources config section for resources' config blocks diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go index 2c81c59230b3..27ec654326f8 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go @@ -6,7 +6,7 @@ package kubernetes import ( "fmt" - + "github.com/elastic/beats/v7/libbeat/common" k8s "k8s.io/client-go/kubernetes" "github.com/elastic/beats/v7/libbeat/common/kubernetes" @@ -34,6 +34,7 @@ func init() { type dynamicProvider struct { logger *logger.Logger config *Config + rawConfig *common.Config } // DynamicProviderBuilder builds the dynamic provider. @@ -46,7 +47,11 @@ func DynamicProviderBuilder(logger *logger.Logger, c *config.Config) (composable if err != nil { return nil, errors.New(err, "failed to unpack configuration") } - return &dynamicProvider{logger, &cfg}, nil + rawConfig, err := common.NewConfigFrom(c) + if err != nil { + return nil, errors.New(err, "failed to unpack configuration") + } + return &dynamicProvider{logger, &cfg, rawConfig}, nil } // Run runs the kubernetes context provider. @@ -130,7 +135,7 @@ func (p *dynamicProvider) newWatcher( config *Config) (kubernetes.Watcher, error) { switch resourceType { case "pod": - watcher, err := NewPodWatcher(comm, config, p.logger, client, p.config.Scope) + watcher, err := NewPodWatcher(comm, config, p.logger, client, p.config.Scope, p.rawConfig) if err != nil { return nil, err } diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go index b0b3ab3b5258..82bca77bcf55 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go @@ -14,6 +14,7 @@ import ( k8s "k8s.io/client-go/kubernetes" "github.com/elastic/beats/v7/libbeat/common/kubernetes" + "github.com/elastic/beats/v7/libbeat/common/kubernetes/metadata" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable" @@ -25,6 +26,7 @@ type pod struct { comm composable.DynamicProviderComm scope string config *Config + metagen metadata.MetaGen } type providerData struct { @@ -39,7 +41,8 @@ func NewPodWatcher( cfg *Config, logger *logp.Logger, client k8s.Interface, - scope string) (kubernetes.Watcher, error) { + scope string, + rawConfig *common.Config) (kubernetes.Watcher, error) { watcher, err := kubernetes.NewWatcher(client, &kubernetes.Pod{}, kubernetes.WatchOptions{ SyncTimeout: cfg.SyncPeriod, Node: cfg.Node, @@ -49,13 +52,43 @@ func NewPodWatcher( if err != nil { return nil, errors.New(err, "couldn't create kubernetes watcher") } - watcher.AddEventHandler(&pod{logger, cfg.CleanupTimeout, comm, scope, cfg}) + + options := kubernetes.WatchOptions{ + SyncTimeout: cfg.SyncPeriod, + Node: cfg.Node, + } + metaConf := cfg.AddResourceMetadata + if metaConf == nil { + metaConf = metadata.GetDefaultResourceMetadataConfig() + } + nodeWatcher, err := kubernetes.NewWatcher(client, &kubernetes.Node{}, options, nil) + if err != nil { + logger.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Node{}, err) + } + namespaceWatcher, err := kubernetes.NewWatcher(client, &kubernetes.Namespace{}, kubernetes.WatchOptions{ + SyncTimeout: cfg.SyncPeriod, + }, nil) + if err != nil { + logger.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Namespace{}, err) + } + + metaGen := metadata.GetPodMetaGen(rawConfig, watcher, nodeWatcher, namespaceWatcher, metaConf) + watcher.AddEventHandler(&pod{ + logger, + cfg.CleanupTimeout, + comm, + scope, + cfg, + metaGen, + }) return watcher, nil } func (p *pod) emitRunning(pod *kubernetes.Pod) { - data := generatePodData(pod, p.config) + + meta := p.metagen.Generate(pod) + data := generatePodData(pod, p.config, meta) data.mapping["scope"] = p.scope // Emit the pod // We emit Pod + containers to ensure that configs matching Pod only @@ -123,7 +156,7 @@ func (p *pod) OnDelete(obj interface{}) { time.AfterFunc(p.cleanupTimeout, func() { p.emitStopped(pod) }) } -func generatePodData(pod *kubernetes.Pod, cfg *Config) providerData { +func generatePodData(pod *kubernetes.Pod, cfg *Config, kubeMeta common.MapStr) providerData { // TODO: add metadata here too ie -> meta := s.metagen.Generate(pod) // Pass annotations to all events so that it can be used in templating and by annotation builders. @@ -138,16 +171,17 @@ func generatePodData(pod *kubernetes.Pod, cfg *Config) providerData { safemapstr.Put(labels, k, v) } - mapping := map[string]interface{}{ - "namespace": pod.GetNamespace(), - "pod": map[string]interface{}{ - "uid": string(pod.GetUID()), - "name": pod.GetName(), - "labels": labels, - "annotations": annotations, - "ip": pod.Status.PodIP, - }, - } + //mapping := map[string]interface{}{ + // "namespace": pod.GetNamespace(), + // "pod": map[string]interface{}{ + // "uid": string(pod.GetUID()), + // "name": pod.GetName(), + // "labels": labels, + // "annotations": annotations, + // "ip": pod.Status.PodIP, + // }, + //} + mapping := map[string]interface{}(kubeMeta) return providerData{ uid: string(pod.GetUID()), mapping: mapping, From 6d801f90e2f49c61aad3fd19e58d9ff4841a4fcb Mon Sep 17 00:00:00 2001 From: chrismark Date: Wed, 8 Sep 2021 15:38:52 +0300 Subject: [PATCH 02/16] Pod metadata completion Signed-off-by: chrismark --- .../composable/providers/kubernetes/pod.go | 121 ++++++++++-------- 1 file changed, 69 insertions(+), 52 deletions(-) diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go index 82bca77bcf55..820017c63892 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go @@ -87,8 +87,7 @@ func NewPodWatcher( func (p *pod) emitRunning(pod *kubernetes.Pod) { - meta := p.metagen.Generate(pod) - data := generatePodData(pod, p.config, meta) + data := generatePodData(pod, p.config, p.metagen) data.mapping["scope"] = p.scope // Emit the pod // We emit Pod + containers to ensure that configs matching Pod only @@ -103,8 +102,11 @@ func (p *pod) emitRunning(pod *kubernetes.Pod) { // TODO: deal with ephemeral containers } -func (p *pod) emitContainers(pod *kubernetes.Pod, containers []kubernetes.Container, containerstatuses []kubernetes.PodContainerStatus) { - generateContainerData(p.comm, pod, containers, containerstatuses, p.config) +func (p *pod) emitContainers( + pod *kubernetes.Pod, + containers []kubernetes.Container, + containerstatuses []kubernetes.PodContainerStatus) { + generateContainerData(p.comm, pod, containers, containerstatuses, p.config, p.metagen) } func (p *pod) emitStopped(pod *kubernetes.Pod) { @@ -156,43 +158,35 @@ func (p *pod) OnDelete(obj interface{}) { time.AfterFunc(p.cleanupTimeout, func() { p.emitStopped(pod) }) } -func generatePodData(pod *kubernetes.Pod, cfg *Config, kubeMeta common.MapStr) providerData { - // TODO: add metadata here too ie -> meta := s.metagen.Generate(pod) +func generatePodData(pod *kubernetes.Pod, cfg *Config, kubeMetaGen metadata.MetaGen) providerData { - // Pass annotations to all events so that it can be used in templating and by annotation builders. - annotations := common.MapStr{} - for k, v := range pod.GetObjectMeta().GetAnnotations() { - safemapstr.Put(annotations, k, v) + meta := kubeMetaGen.Generate(pod) + kubemetaMap, err := meta.GetValue("kubernetes") + if err != nil { + return providerData{} } - labels := common.MapStr{} - for k, v := range pod.GetObjectMeta().GetLabels() { - // TODO: add dedoting option - safemapstr.Put(labels, k, v) + // k8sMapping includes only the metadata that fall under kubernetes.* + // and these are available as dynamic vars through the provider + k8sMapping := map[string]interface{}(kubemetaMap.(common.MapStr)) + + processors := []map[string]interface{}{} + // meta map includes metadata that go under kubernetes.* + // but also other ECS fields like orchestrator.* + for field, metaMap := range meta { + processor := map[string]interface{}{ + "add_fields": map[string]interface{}{ + "fields": metaMap, + "target": field, + }, + } + processors = append(processors, processor) } - //mapping := map[string]interface{}{ - // "namespace": pod.GetNamespace(), - // "pod": map[string]interface{}{ - // "uid": string(pod.GetUID()), - // "name": pod.GetName(), - // "labels": labels, - // "annotations": annotations, - // "ip": pod.Status.PodIP, - // }, - //} - mapping := map[string]interface{}(kubeMeta) return providerData{ uid: string(pod.GetUID()), - mapping: mapping, - processors: []map[string]interface{}{ - { - "add_fields": map[string]interface{}{ - "fields": mapping, - "target": "kubernetes", - }, - }, - }, + mapping: k8sMapping, + processors: processors, } } @@ -201,8 +195,8 @@ func generateContainerData( pod *kubernetes.Pod, containers []kubernetes.Container, containerstatuses []kubernetes.PodContainerStatus, - cfg *Config) { - //TODO: add metadata here too ie -> meta := s.metagen.Generate() + cfg *Config, + kubeMetaGen metadata.MetaGen) { containerIDs := map[string]string{} runtimes := map[string]string{} @@ -229,30 +223,53 @@ func generateContainerData( // ID is the combination of pod UID + container name eventID := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), c.Name) - mapping := map[string]interface{}{ - "namespace": pod.GetNamespace(), - "pod": map[string]interface{}{ - "uid": string(pod.GetUID()), - "name": pod.GetName(), - "labels": labels, - "ip": pod.Status.PodIP, - }, - "container": map[string]interface{}{ - "id": cid, - "name": c.Name, - "image": c.Image, - "runtime": runtimes[c.Name], + meta := kubeMetaGen.Generate(pod, metadata.WithFields("container.name", c.Name)) + kubemetaMap, err := meta.GetValue("kubernetes") + if err != nil { + continue + } + + // k8sMapping includes only the metadata that fall under kubernetes.* + // and these are available as dynamic vars through the provider + k8sMapping := map[string]interface{}(kubemetaMap.(common.MapStr)) + + // add container metadata under kubernetes.container.* to + // make them available to dynamics var resolution + k8sMapping["container"] = common.MapStr{ + "id": cid, + "name": c.Name, + "image": c.Image, + "runtime": runtimes[c.Name], + } + + //container ECS fields + cmeta := common.MapStr{ + "id": cid, + "runtime": runtimes[c.Name], + "image": common.MapStr{ + "name": c.Image, }, } processors := []map[string]interface{}{ { "add_fields": map[string]interface{}{ - "fields": mapping, - "target": "kubernetes", + "fields": cmeta, + "target": "container", }, }, } - comm.AddOrUpdate(eventID, ContainerPriority, mapping, processors) + // meta map includes metadata that go under kubernetes.* + // but also other ECS fields like orchestrator.* + for field, metaMap := range meta { + processor := map[string]interface{}{ + "add_fields": map[string]interface{}{ + "fields": metaMap, + "target": field, + }, + } + processors = append(processors, processor) + } + comm.AddOrUpdate(eventID, ContainerPriority, k8sMapping, processors) } } From 7a8c55f7c14a46177d87fa42ab3589583ee06bf7 Mon Sep 17 00:00:00 2001 From: chrismark Date: Wed, 8 Sep 2021 16:31:43 +0300 Subject: [PATCH 03/16] Add metadata config options Signed-off-by: chrismark --- .../pkg/composable/providers/kubernetes/config.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go index 9084ccbc80ee..e915e85ad655 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go @@ -28,6 +28,14 @@ type Config struct { Node string `config:"node"` AddResourceMetadata *metadata.AddResourceMetadataConfig `config:"add_resource_metadata"` + IncludeLabels []string `config:"include_labels"` + ExcludeLabels []string `config:"exclude_labels"` + + LabelsDedot bool `config:"labels.dedot"` + AnnotationsDedot bool `config:"annotations.dedot"` + + // Undocumented settings, to be deprecated in favor of `drop_fields` processor: + IncludeCreatorMetadata bool `config:"include_creator_metadata"` } // Resources config section for resources' config blocks @@ -47,6 +55,9 @@ func (c *Config) InitDefaults() { c.CleanupTimeout = 60 * time.Second c.SyncPeriod = 10 * time.Minute c.Scope = "node" + c.IncludeCreatorMetadata = true + c.LabelsDedot = true + c.AnnotationsDedot = true } // Validate ensures correctness of config From a47867862b4c9e63b559a4203e54dc9e08e961be Mon Sep 17 00:00:00 2001 From: chrismark Date: Wed, 8 Sep 2021 17:05:12 +0300 Subject: [PATCH 04/16] Add metadata for node and svc Signed-off-by: chrismark --- .../providers/kubernetes/kubernetes.go | 4 +- .../composable/providers/kubernetes/node.go | 64 ++++++++-------- .../providers/kubernetes/service.go | 74 +++++++++++-------- 3 files changed, 78 insertions(+), 64 deletions(-) diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go index 27ec654326f8..51e829bf202b 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go @@ -141,13 +141,13 @@ func (p *dynamicProvider) newWatcher( } return watcher, nil case "node": - watcher, err := NewNodeWatcher(comm, config, p.logger, client, p.config.Scope) + watcher, err := NewNodeWatcher(comm, config, p.logger, client, p.config.Scope, p.rawConfig) if err != nil { return nil, err } return watcher, nil case "service": - watcher, err := NewServiceWatcher(comm, config, p.logger, client, p.config.Scope) + watcher, err := NewServiceWatcher(comm, config, p.logger, client, p.config.Scope, p.rawConfig) if err != nil { return nil, err } diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go index 455a06107efe..20bb027a84fc 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go @@ -5,6 +5,7 @@ package kubernetes import ( + "github.com/elastic/beats/v7/libbeat/common/kubernetes/metadata" "time" v1 "k8s.io/api/core/v1" @@ -13,7 +14,6 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/kubernetes" - "github.com/elastic/beats/v7/libbeat/common/safemapstr" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable" @@ -25,6 +25,7 @@ type node struct { comm composable.DynamicProviderComm scope string config *Config + metagen metadata.MetaGen } type nodeData struct { @@ -39,7 +40,8 @@ func NewNodeWatcher( cfg *Config, logger *logp.Logger, client k8s.Interface, - scope string) (kubernetes.Watcher, error) { + scope string, + rawConfig *common.Config) (kubernetes.Watcher, error) { watcher, err := kubernetes.NewWatcher(client, &kubernetes.Node{}, kubernetes.WatchOptions{ SyncTimeout: cfg.SyncPeriod, Node: cfg.Node, @@ -49,13 +51,21 @@ func NewNodeWatcher( if err != nil { return nil, errors.New(err, "couldn't create kubernetes watcher") } - watcher.AddEventHandler(&node{logger, cfg.CleanupTimeout, comm, scope, cfg}) + + metaGen := metadata.NewNodeMetadataGenerator(rawConfig, watcher.Store(), client) + watcher.AddEventHandler(&node{ + logger, + cfg.CleanupTimeout, + comm, + scope, + cfg, + metaGen}) return watcher, nil } func (n *node) emitRunning(node *kubernetes.Node) { - data := generateNodeData(node, n.config) + data := generateNodeData(node, n.config, n.metagen) if data == nil { return } @@ -165,7 +175,7 @@ func isNodeReady(node *kubernetes.Node) bool { return false } -func generateNodeData(node *kubernetes.Node, cfg *Config) *nodeData { +func generateNodeData(node *kubernetes.Node, cfg *Config, kubeMetaGen metadata.MetaGen) *nodeData { host := getAddress(node) // If a node doesn't have an IP then dont monitor it @@ -178,41 +188,31 @@ func generateNodeData(node *kubernetes.Node, cfg *Config) *nodeData { return nil } - //TODO: add metadata here too ie -> meta := n.metagen.Generate(node) - - // Pass annotations to all events so that it can be used in templating and by annotation builders. - annotations := common.MapStr{} - for k, v := range node.GetObjectMeta().GetAnnotations() { - safemapstr.Put(annotations, k, v) - } - - labels := common.MapStr{} - for k, v := range node.GetObjectMeta().GetLabels() { - // TODO: add dedoting option - safemapstr.Put(labels, k, v) + meta := kubeMetaGen.Generate(node) + kubemetaMap, err := meta.GetValue("kubernetes") + if err != nil { + return &nodeData{} } - mapping := map[string]interface{}{ - "node": map[string]interface{}{ - "uid": string(node.GetUID()), - "name": node.GetName(), - "labels": labels, - "annotations": annotations, - "ip": host, - }, - } + // k8sMapping includes only the metadata that fall under kubernetes.* + // and these are available as dynamic vars through the provider + k8sMapping := map[string]interface{}(kubemetaMap.(common.MapStr)) - processors := []map[string]interface{}{ - { + processors := []map[string]interface{}{} + // meta map includes metadata that go under kubernetes.* + // but also other ECS fields like orchestrator.* + for field, metaMap := range meta { + processor := map[string]interface{}{ "add_fields": map[string]interface{}{ - "fields": mapping, - "target": "kubernetes", + "fields": metaMap, + "target": field, }, - }, + } + processors = append(processors, processor) } return &nodeData{ node: node, - mapping: mapping, + mapping: k8sMapping, processors: processors, } } diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service.go index a0f73b16382e..19fa8c727a7b 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service.go @@ -5,6 +5,8 @@ package kubernetes import ( + "fmt" + "github.com/elastic/beats/v7/libbeat/common/kubernetes/metadata" "time" k8s "k8s.io/client-go/kubernetes" @@ -23,6 +25,7 @@ type service struct { comm composable.DynamicProviderComm scope string config *Config + metagen metadata.MetaGen } type serviceData struct { @@ -37,7 +40,8 @@ func NewServiceWatcher( cfg *Config, logger *logp.Logger, client k8s.Interface, - scope string) (kubernetes.Watcher, error) { + scope string, + rawConfig *common.Config) (kubernetes.Watcher, error) { watcher, err := kubernetes.NewWatcher(client, &kubernetes.Service{}, kubernetes.WatchOptions{ SyncTimeout: cfg.SyncPeriod, Node: cfg.Node, @@ -46,13 +50,31 @@ func NewServiceWatcher( if err != nil { return nil, errors.New(err, "couldn't create kubernetes watcher") } - watcher.AddEventHandler(&service{logger, cfg.CleanupTimeout, comm, scope, cfg}) + + metaConf := metadata.GetDefaultResourceMetadataConfig() + namespaceWatcher, err := kubernetes.NewWatcher(client, &kubernetes.Namespace{}, kubernetes.WatchOptions{ + SyncTimeout: cfg.SyncPeriod, + Namespace: cfg.Namespace, + }, nil) + if err != nil { + return nil, fmt.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Namespace{}, err) + } + namespaceMeta := metadata.NewNamespaceMetadataGenerator(metaConf.Namespace, namespaceWatcher.Store(), client) + metaGen := metadata.NewServiceMetadataGenerator(cfg, watcher.Store(), namespaceMeta, client) + watcher.AddEventHandler(&service{ + logger, + cfg.CleanupTimeout, + comm, + scope, + cfg, + metaGen, + }) return watcher, nil } func (s *service) emitRunning(service *kubernetes.Service) { - data := generateServiceData(service, s.config) + data := generateServiceData(service, s.config, s.metagen) if data == nil { return } @@ -92,7 +114,7 @@ func (s *service) OnDelete(obj interface{}) { time.AfterFunc(s.cleanupTimeout, func() { s.emitStopped(service) }) } -func generateServiceData(service *kubernetes.Service, cfg *Config) *serviceData { +func generateServiceData(service *kubernetes.Service, cfg *Config, kubeMetaGen metadata.MetaGen) *serviceData { host := service.Spec.ClusterIP // If a service doesn't have an IP then dont monitor it @@ -100,41 +122,33 @@ func generateServiceData(service *kubernetes.Service, cfg *Config) *serviceData return nil } - //TODO: add metadata here too ie -> meta := s.metagen.Generate(service) - // Pass annotations to all events so that it can be used in templating and by annotation builders. - annotations := common.MapStr{} - for k, v := range service.GetObjectMeta().GetAnnotations() { - safemapstr.Put(annotations, k, v) - } - - labels := common.MapStr{} - for k, v := range service.GetObjectMeta().GetLabels() { - // TODO: add dedoting option - safemapstr.Put(labels, k, v) + meta := kubeMetaGen.Generate(service) + kubemetaMap, err := meta.GetValue("kubernetes") + if err != nil { + return &serviceData{} } - mapping := map[string]interface{}{ - "service": map[string]interface{}{ - "uid": string(service.GetUID()), - "name": service.GetName(), - "labels": labels, - "annotations": annotations, - "ip": host, - }, - } + // k8sMapping includes only the metadata that fall under kubernetes.* + // and these are available as dynamic vars through the provider + k8sMapping := map[string]interface{}(kubemetaMap.(common.MapStr)) - processors := []map[string]interface{}{ - { + processors := []map[string]interface{}{} + // meta map includes metadata that go under kubernetes.* + // but also other ECS fields like orchestrator.* + for field, metaMap := range meta { + processor := map[string]interface{}{ "add_fields": map[string]interface{}{ - "fields": mapping, - "target": "kubernetes", + "fields": metaMap, + "target": field, }, - }, + } + processors = append(processors, processor) } + return &serviceData{ service: service, - mapping: mapping, + mapping: k8sMapping, processors: processors, } } From 595cb27dd46b55c93722f4e2cabf4a228793eccd Mon Sep 17 00:00:00 2001 From: chrismark Date: Thu, 9 Sep 2021 12:21:31 +0300 Subject: [PATCH 05/16] Make annotations available in dynamic var resolution Signed-off-by: chrismark --- .../composable/providers/kubernetes/config.go | 7 ++-- .../providers/kubernetes/kubernetes.go | 8 ++-- .../composable/providers/kubernetes/node.go | 13 +++++- .../composable/providers/kubernetes/pod.go | 42 ++++++++++++------- .../providers/kubernetes/service.go | 19 ++++++--- 5 files changed, 63 insertions(+), 26 deletions(-) diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go index e915e85ad655..3f3d84297ece 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go @@ -8,9 +8,10 @@ package kubernetes import ( - "github.com/elastic/beats/v7/libbeat/common/kubernetes/metadata" "time" + "github.com/elastic/beats/v7/libbeat/common/kubernetes/metadata" + "github.com/elastic/beats/v7/libbeat/logp" ) @@ -28,8 +29,8 @@ type Config struct { Node string `config:"node"` AddResourceMetadata *metadata.AddResourceMetadataConfig `config:"add_resource_metadata"` - IncludeLabels []string `config:"include_labels"` - ExcludeLabels []string `config:"exclude_labels"` + IncludeLabels []string `config:"include_labels"` + ExcludeLabels []string `config:"exclude_labels"` LabelsDedot bool `config:"labels.dedot"` AnnotationsDedot bool `config:"annotations.dedot"` diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go index 51e829bf202b..ebf14723c457 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go @@ -6,9 +6,11 @@ package kubernetes import ( "fmt" - "github.com/elastic/beats/v7/libbeat/common" + k8s "k8s.io/client-go/kubernetes" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/kubernetes" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable" @@ -32,8 +34,8 @@ func init() { } type dynamicProvider struct { - logger *logger.Logger - config *Config + logger *logger.Logger + config *Config rawConfig *common.Config } diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go index 20bb027a84fc..d0fe895f4008 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go @@ -5,9 +5,11 @@ package kubernetes import ( - "github.com/elastic/beats/v7/libbeat/common/kubernetes/metadata" "time" + "github.com/elastic/beats/v7/libbeat/common/kubernetes/metadata" + "github.com/elastic/beats/v7/libbeat/common/safemapstr" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" k8s "k8s.io/client-go/kubernetes" @@ -194,10 +196,19 @@ func generateNodeData(node *kubernetes.Node, cfg *Config, kubeMetaGen metadata.M return &nodeData{} } + // Pass annotations to all events so that it can be used in templating and by annotation builders. + annotations := common.MapStr{} + for k, v := range node.GetObjectMeta().GetAnnotations() { + safemapstr.Put(annotations, k, v) + } + // k8sMapping includes only the metadata that fall under kubernetes.* // and these are available as dynamic vars through the provider k8sMapping := map[string]interface{}(kubemetaMap.(common.MapStr)) + // add annotations to be discoverable by templates + k8sMapping["annotations"] = annotations + processors := []map[string]interface{}{} // meta map includes metadata that go under kubernetes.* // but also other ECS fields like orchestrator.* diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go index 758fd2e7d4c2..2abf2d0b418f 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go @@ -188,8 +188,8 @@ func generatePodData(pod *kubernetes.Pod, cfg *Config, kubeMetaGen metadata.Meta } return providerData{ - uid: string(pod.GetUID()), - mapping: k8sMapping, + uid: string(pod.GetUID()), + mapping: k8sMapping, processors: processors, } } @@ -210,9 +210,10 @@ func generateContainerData( runtimes[c.Name] = runtime } - labels := common.MapStr{} - for k, v := range pod.GetObjectMeta().GetLabels() { - safemapstr.Put(labels, k, v) + // Pass annotations to all events so that it can be used in templating and by annotation builders. + annotations := common.MapStr{} + for k, v := range pod.GetObjectMeta().GetAnnotations() { + safemapstr.Put(annotations, k, v) } for _, c := range containers { @@ -237,14 +238,8 @@ func generateContainerData( // and these are available as dynamic vars through the provider k8sMapping := map[string]interface{}(kubemetaMap.(common.MapStr)) - // add container metadata under kubernetes.container.* to - // make them available to dynamics var resolution - k8sMapping["container"] = common.MapStr{ - "id": cid, - "name": c.Name, - "image": c.Image, - "runtime": runtimes[c.Name], - } + // add annotations to be discoverable by templates + k8sMapping["annotations"] = annotations //container ECS fields cmeta := common.MapStr{ @@ -274,7 +269,26 @@ func generateContainerData( } processors = append(processors, processor) } - comm.AddOrUpdate(eventID, ContainerPriority, k8sMapping, processors) + + // add container metadata under kubernetes.container.* to + // make them available to dynamic var resolution + containerMeta := common.MapStr{ + "id": cid, + "name": c.Name, + "image": c.Image, + "runtime": runtimes[c.Name], + } + if len(c.Ports) > 0 { + for _, port := range c.Ports { + containerMeta.Put("port", port.ContainerPort) + containerMeta.Put("port_name", port.Name) + k8sMapping["container"] = containerMeta + comm.AddOrUpdate(eventID, ContainerPriority, k8sMapping, processors) + } + } else { + k8sMapping["container"] = containerMeta + comm.AddOrUpdate(eventID, ContainerPriority, k8sMapping, processors) + } } } diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service.go index 19fa8c727a7b..fcbb13aa8a9d 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service.go @@ -6,14 +6,15 @@ package kubernetes import ( "fmt" - "github.com/elastic/beats/v7/libbeat/common/kubernetes/metadata" "time" + "github.com/elastic/beats/v7/libbeat/common/kubernetes/metadata" + "github.com/elastic/beats/v7/libbeat/common/safemapstr" + k8s "k8s.io/client-go/kubernetes" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/kubernetes" - "github.com/elastic/beats/v7/libbeat/common/safemapstr" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable" @@ -60,7 +61,7 @@ func NewServiceWatcher( return nil, fmt.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Namespace{}, err) } namespaceMeta := metadata.NewNamespaceMetadataGenerator(metaConf.Namespace, namespaceWatcher.Store(), client) - metaGen := metadata.NewServiceMetadataGenerator(cfg, watcher.Store(), namespaceMeta, client) + metaGen := metadata.NewServiceMetadataGenerator(rawConfig, watcher.Store(), namespaceMeta, client) watcher.AddEventHandler(&service{ logger, cfg.CleanupTimeout, @@ -122,17 +123,25 @@ func generateServiceData(service *kubernetes.Service, cfg *Config, kubeMetaGen m return nil } - meta := kubeMetaGen.Generate(service) kubemetaMap, err := meta.GetValue("kubernetes") if err != nil { return &serviceData{} } + // Pass annotations to all events so that it can be used in templating and by annotation builders. + annotations := common.MapStr{} + for k, v := range service.GetObjectMeta().GetAnnotations() { + safemapstr.Put(annotations, k, v) + } + // k8sMapping includes only the metadata that fall under kubernetes.* // and these are available as dynamic vars through the provider k8sMapping := map[string]interface{}(kubemetaMap.(common.MapStr)) + // add annotations to be discoverable by templates + k8sMapping["annotations"] = annotations + processors := []map[string]interface{}{} // meta map includes metadata that go under kubernetes.* // but also other ECS fields like orchestrator.* @@ -145,7 +154,7 @@ func generateServiceData(service *kubernetes.Service, cfg *Config, kubeMetaGen m } processors = append(processors, processor) } - + return &serviceData{ service: service, mapping: k8sMapping, From c21d8e9167c99843fe77ba6bd5884d25c0497697 Mon Sep 17 00:00:00 2001 From: chrismark Date: Thu, 9 Sep 2021 15:52:36 +0300 Subject: [PATCH 06/16] Add ns updates handling Signed-off-by: chrismark --- .../composable/providers/kubernetes/node.go | 2 +- .../composable/providers/kubernetes/pod.go | 151 +++++++++++++++--- .../providers/kubernetes/service.go | 2 +- 3 files changed, 135 insertions(+), 20 deletions(-) diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go index d0fe895f4008..6808f96a89e2 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go @@ -204,7 +204,7 @@ func generateNodeData(node *kubernetes.Node, cfg *Config, kubeMetaGen metadata.M // k8sMapping includes only the metadata that fall under kubernetes.* // and these are available as dynamic vars through the provider - k8sMapping := map[string]interface{}(kubemetaMap.(common.MapStr)) + k8sMapping := map[string]interface{}(kubemetaMap.(common.MapStr).Clone()) // add annotations to be discoverable by templates k8sMapping["annotations"] = annotations diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go index 2abf2d0b418f..85139d04aa2a 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go @@ -6,6 +6,7 @@ package kubernetes import ( "fmt" + "sync" "time" "github.com/elastic/beats/v7/libbeat/common" @@ -21,12 +22,18 @@ import ( ) type pod struct { - logger *logp.Logger - cleanupTimeout time.Duration - comm composable.DynamicProviderComm - scope string - config *Config - metagen metadata.MetaGen + logger *logp.Logger + cleanupTimeout time.Duration + comm composable.DynamicProviderComm + scope string + config *Config + metagen metadata.MetaGen + namespaceWatcher kubernetes.Watcher + + // Mutex used by configuration updates not triggered by the main watcher, + // to avoid race conditions between cross updates and deletions. + // Other updaters must use a write lock. + crossUpdate sync.RWMutex } type providerData struct { @@ -35,6 +42,22 @@ type providerData struct { processors []map[string]interface{} } +// podUpdaterHandlerFunc is a function that handles pod updater notifications. +type podUpdaterHandlerFunc func(interface{}) + +// podUpdaterStore is the interface that an object needs to implement to be +// used as a pod updater store. +type podUpdaterStore interface { + List() []interface{} +} + +// namespacePodUpdater notifies updates on pods when their namespaces are updated. +type namespacePodUpdater struct { + handler podUpdaterHandlerFunc + store podUpdaterStore + locker sync.Locker +} + // NewPodWatcher creates a watcher that can discover and process pod objects func NewPodWatcher( comm composable.DynamicProviderComm, @@ -73,21 +96,30 @@ func NewPodWatcher( } metaGen := metadata.GetPodMetaGen(rawConfig, watcher, nodeWatcher, namespaceWatcher, metaConf) - watcher.AddEventHandler(&pod{ - logger, - cfg.CleanupTimeout, - comm, - scope, - cfg, - metaGen, - }) + + p := pod{ + logger: logger, + cleanupTimeout: cfg.CleanupTimeout, + comm: comm, + scope: scope, + config: cfg, + metagen: metaGen, + namespaceWatcher: namespaceWatcher, + } + + watcher.AddEventHandler(&p) + + if namespaceWatcher != nil && metaConf.Namespace.Enabled() { + updater := newNamespacePodUpdater(p.unlockedUpdate, watcher.Store(), &p.crossUpdate) + namespaceWatcher.AddEventHandler(updater) + } return watcher, nil } func (p *pod) emitRunning(pod *kubernetes.Pod) { - data := generatePodData(pod, p.config, p.metagen) + data := generatePodData(pod, p.config, p.metagen, p.namespaceWatcher) data.mapping["scope"] = p.scope // Emit the pod // We emit Pod + containers to ensure that configs matching Pod only @@ -131,6 +163,9 @@ func (p *pod) emitStopped(pod *kubernetes.Pod) { // OnAdd ensures processing of pod objects that are newly added func (p *pod) OnAdd(obj interface{}) { + p.crossUpdate.RLock() + defer p.crossUpdate.RUnlock() + p.logger.Debugf("pod add: %+v", obj) p.emitRunning(obj.(*kubernetes.Pod)) } @@ -139,6 +174,14 @@ func (p *pod) OnAdd(obj interface{}) { // if it is terminating, a stop event is scheduled, if not, a stop and a start // events are sent sequentially to recreate the resources assotiated to the pod. func (p *pod) OnUpdate(obj interface{}) { + p.crossUpdate.RLock() + defer p.crossUpdate.RUnlock() + + p.unlockedUpdate(obj) +} + +func (p *pod) unlockedUpdate(obj interface{}) { + p.logger.Debugf("Watcher Pod update: %+v", obj) pod := obj.(*kubernetes.Pod) p.logger.Debugf("pod update for pod: %+v, status: %+v", pod.Name, pod.Status.Phase) @@ -157,12 +200,15 @@ func (p *pod) OnUpdate(obj interface{}) { // OnDelete stops pod objects that are deleted func (p *pod) OnDelete(obj interface{}) { + p.crossUpdate.RLock() + defer p.crossUpdate.RUnlock() + p.logger.Debugf("pod delete: %+v", obj) pod := obj.(*kubernetes.Pod) time.AfterFunc(p.cleanupTimeout, func() { p.emitStopped(pod) }) } -func generatePodData(pod *kubernetes.Pod, cfg *Config, kubeMetaGen metadata.MetaGen) providerData { +func generatePodData(pod *kubernetes.Pod, cfg *Config, kubeMetaGen metadata.MetaGen, namespaceWatcher kubernetes.Watcher) providerData { meta := kubeMetaGen.Generate(pod) kubemetaMap, err := meta.GetValue("kubernetes") @@ -174,6 +220,11 @@ func generatePodData(pod *kubernetes.Pod, cfg *Config, kubeMetaGen metadata.Meta // and these are available as dynamic vars through the provider k8sMapping := map[string]interface{}(kubemetaMap.(common.MapStr)) + namespaceAnnotations := podNamespaceAnnotations(pod, namespaceWatcher) + if len(namespaceAnnotations) != 0 { + k8sMapping["namespace_annotations"] = namespaceAnnotations + } + processors := []map[string]interface{}{} // meta map includes metadata that go under kubernetes.* // but also other ECS fields like orchestrator.* @@ -236,7 +287,7 @@ func generateContainerData( // k8sMapping includes only the metadata that fall under kubernetes.* // and these are available as dynamic vars through the provider - k8sMapping := map[string]interface{}(kubemetaMap.(common.MapStr)) + k8sMapping := map[string]interface{}(kubemetaMap.(common.MapStr).Clone()) // add annotations to be discoverable by templates k8sMapping["annotations"] = annotations @@ -280,7 +331,7 @@ func generateContainerData( } if len(c.Ports) > 0 { for _, port := range c.Ports { - containerMeta.Put("port", port.ContainerPort) + containerMeta.Put("port", fmt.Sprintf("%v", port.ContainerPort)) containerMeta.Put("port_name", port.Name) k8sMapping["container"] = containerMeta comm.AddOrUpdate(eventID, ContainerPriority, k8sMapping, processors) @@ -304,3 +355,67 @@ func getEphemeralContainers(pod *kubernetes.Pod) ([]kubernetes.Container, []kube } return ephContainers, ephContainersStatuses } + +// podNamespaceAnnotations returns the annotations of the namespace of the pod +func podNamespaceAnnotations(pod *kubernetes.Pod, watcher kubernetes.Watcher) common.MapStr { + if watcher == nil { + return nil + } + + rawNs, ok, err := watcher.Store().GetByKey(pod.Namespace) + if !ok || err != nil { + return nil + } + + namespace, ok := rawNs.(*kubernetes.Namespace) + if !ok { + return nil + } + + annotations := common.MapStr{} + for k, v := range namespace.GetAnnotations() { + safemapstr.Put(annotations, k, v) + } + return annotations +} + +// newNamespacePodUpdater creates a namespacePodUpdater +func newNamespacePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, locker sync.Locker) *namespacePodUpdater { + return &namespacePodUpdater{ + handler: handler, + store: store, + locker: locker, + } +} + +// OnUpdate handles update events on namespaces. +func (n *namespacePodUpdater) OnUpdate(obj interface{}) { + ns, ok := obj.(*kubernetes.Namespace) + if !ok { + return + } + + // n.store.List() returns a snapshot at this point. If a delete is received + // from the main watcher, this loop may generate an update event after the + // delete is processed, leaving configurations that would never be deleted. + // Also this loop can miss updates, what could leave outdated configurations. + // Avoid these issues by locking the processing of events from the main watcher. + if n.locker != nil { + n.locker.Lock() + defer n.locker.Unlock() + } + for _, pod := range n.store.List() { + pod, ok := pod.(*kubernetes.Pod) + if ok && pod.Namespace == ns.Name { + n.handler(pod) + } + } +} + +// OnAdd handles add events on namespaces. Nothing to do, if pods are added to this +// namespace they will generate their own add events. +func (*namespacePodUpdater) OnAdd(interface{}) {} + +// OnDelete handles delete events on namespaces. Nothing to do, if pods are deleted from this +// namespace they will generate their own delete events. +func (*namespacePodUpdater) OnDelete(interface{}) {} diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service.go index fcbb13aa8a9d..8e0e2625d55f 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service.go @@ -137,7 +137,7 @@ func generateServiceData(service *kubernetes.Service, cfg *Config, kubeMetaGen m // k8sMapping includes only the metadata that fall under kubernetes.* // and these are available as dynamic vars through the provider - k8sMapping := map[string]interface{}(kubemetaMap.(common.MapStr)) + k8sMapping := map[string]interface{}(kubemetaMap.(common.MapStr).Clone()) // add annotations to be discoverable by templates k8sMapping["annotations"] = annotations From 5132b5e1fd257d362cfdd7beac183801d01ae409 Mon Sep 17 00:00:00 2001 From: chrismark Date: Mon, 13 Sep 2021 12:59:28 +0300 Subject: [PATCH 07/16] Fix tests Signed-off-by: chrismark --- .../providers/kubernetes/node_test.go | 89 ++++++++-- .../composable/providers/kubernetes/pod.go | 20 ++- .../providers/kubernetes/pod_test.go | 160 +++++++++++++++--- .../providers/kubernetes/service_test.go | 91 +++++++++- 4 files changed, 312 insertions(+), 48 deletions(-) diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node_test.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node_test.go index 68c35878490e..cbedc6e9c699 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node_test.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node_test.go @@ -7,6 +7,8 @@ package kubernetes import ( "testing" + "github.com/elastic/beats/v7/libbeat/common/kubernetes/metadata" + "github.com/elastic/beats/v7/libbeat/common" "github.com/stretchr/testify/assert" @@ -41,10 +43,10 @@ func TestGenerateNodeData(t *testing.T) { }, } - data := generateNodeData(node, &Config{}) + data := generateNodeData(node, &Config{}, &nodeMeta{}) mapping := map[string]interface{}{ - "node": map[string]interface{}{ + "node": common.MapStr{ "uid": string(node.GetUID()), "name": node.GetName(), "labels": common.MapStr{ @@ -55,18 +57,85 @@ func TestGenerateNodeData(t *testing.T) { }, "ip": "node1", }, + "annotations": common.MapStr{ + "baz": "ban", + }, } - processors := []map[string]interface{}{ - { - "add_fields": map[string]interface{}{ - "fields": mapping, - "target": "kubernetes", - }, + processors := map[string]interface{}{ + "orchestrator": common.MapStr{ + "cluster": common.MapStr{ + "name": "devcluster", + "url": "8.8.8.8:9090"}, + }, "kubernetes": common.MapStr{ + "node": common.MapStr{ + "annotations": common.MapStr{"baz": "ban"}, + "ip": "node1", + "labels": common.MapStr{"foo": "bar"}, + "name": "testnode", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133"}, }, } - assert.Equal(t, node, data.node) assert.Equal(t, mapping, data.mapping) - assert.Equal(t, processors, data.processors) + for _, v := range data.processors { + k := v["add_fields"].(map[string]interface{}) + target := k["target"].(string) + fields := k["fields"] + assert.Equal(t, processors[target], fields) + } +} + +type nodeMeta struct{} + +// Generate generates node metadata from a resource object +// Metadata map is in the following form: +// { +// "kubernetes": {}, +// "some.ecs.field": "asdf" +// } +// All Kubernetes fields that need to be stored under kubernetes. prefix are populated by +// GenerateK8s method while fields that are part of ECS are generated by GenerateECS method +func (n *nodeMeta) Generate(obj kubernetes.Resource, opts ...metadata.FieldOptions) common.MapStr { + ecsFields := n.GenerateECS(obj) + meta := common.MapStr{ + "kubernetes": n.GenerateK8s(obj, opts...), + } + meta.DeepUpdate(ecsFields) + return meta +} + +// GenerateECS generates node ECS metadata from a resource object +func (n *nodeMeta) GenerateECS(obj kubernetes.Resource) common.MapStr { + return common.MapStr{ + "orchestrator": common.MapStr{ + "cluster": common.MapStr{ + "name": "devcluster", + "url": "8.8.8.8:9090", + }, + }, + } +} + +// GenerateK8s generates node metadata from a resource object +func (n *nodeMeta) GenerateK8s(obj kubernetes.Resource, opts ...metadata.FieldOptions) common.MapStr { + k8sNode := obj.(*kubernetes.Node) + return common.MapStr{ + "node": common.MapStr{ + "uid": string(k8sNode.GetUID()), + "name": k8sNode.GetName(), + "labels": common.MapStr{ + "foo": "bar", + }, + "annotations": common.MapStr{ + "baz": "ban", + }, + "ip": "node1", + }, + } +} + +// GenerateFromName generates node metadata from a node name +func (n *nodeMeta) GenerateFromName(name string, opts ...metadata.FieldOptions) common.MapStr { + return nil } diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go index 85139d04aa2a..b7ca066996ec 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go @@ -119,7 +119,9 @@ func NewPodWatcher( func (p *pod) emitRunning(pod *kubernetes.Pod) { - data := generatePodData(pod, p.config, p.metagen, p.namespaceWatcher) + namespaceAnnotations := podNamespaceAnnotations(pod, p.namespaceWatcher) + + data := generatePodData(pod, p.config, p.metagen, namespaceAnnotations) data.mapping["scope"] = p.scope // Emit the pod // We emit Pod + containers to ensure that configs matching Pod only @@ -208,7 +210,11 @@ func (p *pod) OnDelete(obj interface{}) { time.AfterFunc(p.cleanupTimeout, func() { p.emitStopped(pod) }) } -func generatePodData(pod *kubernetes.Pod, cfg *Config, kubeMetaGen metadata.MetaGen, namespaceWatcher kubernetes.Watcher) providerData { +func generatePodData( + pod *kubernetes.Pod, + cfg *Config, + kubeMetaGen metadata.MetaGen, + namespaceAnnotations common.MapStr) providerData { meta := kubeMetaGen.Generate(pod) kubemetaMap, err := meta.GetValue("kubernetes") @@ -218,13 +224,19 @@ func generatePodData(pod *kubernetes.Pod, cfg *Config, kubeMetaGen metadata.Meta // k8sMapping includes only the metadata that fall under kubernetes.* // and these are available as dynamic vars through the provider - k8sMapping := map[string]interface{}(kubemetaMap.(common.MapStr)) + k8sMapping := map[string]interface{}(kubemetaMap.(common.MapStr).Clone()) - namespaceAnnotations := podNamespaceAnnotations(pod, namespaceWatcher) if len(namespaceAnnotations) != 0 { k8sMapping["namespace_annotations"] = namespaceAnnotations } + // Pass annotations to all events so that it can be used in templating and by annotation builders. + annotations := common.MapStr{} + for k, v := range pod.GetObjectMeta().GetAnnotations() { + safemapstr.Put(annotations, k, v) + } + k8sMapping["annotations"] = annotations + processors := []map[string]interface{}{} // meta map includes metadata that go under kubernetes.* // but also other ECS fields like orchestrator.* diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod_test.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod_test.go index 00c7ee84766e..7e52b7418843 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod_test.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod_test.go @@ -9,6 +9,8 @@ import ( "fmt" "testing" + "github.com/elastic/beats/v7/libbeat/common/kubernetes/metadata" + "github.com/elastic/beats/v7/libbeat/common" "github.com/stretchr/testify/assert" @@ -44,11 +46,14 @@ func TestGeneratePodData(t *testing.T) { Status: kubernetes.PodStatus{PodIP: "127.0.0.5"}, } - data := generatePodData(pod, &Config{}) + namespaceAnnotations := common.MapStr{ + "nsa": "nsb", + } + data := generatePodData(pod, &Config{}, &podMeta{}, namespaceAnnotations) mapping := map[string]interface{}{ "namespace": pod.GetNamespace(), - "pod": map[string]interface{}{ + "pod": common.MapStr{ "uid": string(pod.GetUID()), "name": pod.GetName(), "labels": common.MapStr{ @@ -59,19 +64,37 @@ func TestGeneratePodData(t *testing.T) { }, "ip": pod.Status.PodIP, }, - } - processors := []map[string]interface{}{ - { - "add_fields": map[string]interface{}{ - "fields": mapping, - "target": "kubernetes", - }, + "namespace_annotations": common.MapStr{ + "nsa": "nsb", + }, + "annotations": common.MapStr{ + "app": "production", }, } + processors := map[string]interface{}{ + "orchestrator": common.MapStr{ + "cluster": common.MapStr{ + "name": "devcluster", + "url": "8.8.8.8:9090"}, + }, "kubernetes": common.MapStr{ + "namespace": "testns", + "namespace_annotations": common.MapStr{"nsa": "nsb"}, + "pod": common.MapStr{ + "annotations": common.MapStr{"app": "production"}, + "ip": "127.0.0.5", + "labels": common.MapStr{"foo": "bar"}, + "name": "testpod", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133"}}, + } assert.Equal(t, string(pod.GetUID()), data.uid) assert.Equal(t, mapping, data.mapping) - assert.Equal(t, processors, data.processors) + for _, v := range data.processors { + k := v["add_fields"].(map[string]interface{}) + target := k["target"].(string) + fields := k["fields"] + assert.Equal(t, processors[target], fields) + } } func TestGenerateContainerPodData(t *testing.T) { @@ -129,40 +152,67 @@ func TestGenerateContainerPodData(t *testing.T) { pod, containers, containerStatuses, - &Config{}) + &Config{}, + &podMeta{}) mapping := map[string]interface{}{ "namespace": pod.GetNamespace(), - "pod": map[string]interface{}{ + "pod": common.MapStr{ "uid": string(pod.GetUID()), "name": pod.GetName(), "labels": common.MapStr{ "foo": "bar", }, + "annotations": common.MapStr{ + "app": "production", + }, "ip": pod.Status.PodIP, }, - "container": map[string]interface{}{ - "id": "asdfghdeadbeef", - "name": "nginx", - "image": "nginx:1.120", - "runtime": "crio", + "container": common.MapStr{ + "id": "asdfghdeadbeef", + "name": "nginx", + "image": "nginx:1.120", + "runtime": "crio", + "port": "80", + "port_name": "http", }, - } - - processors := []map[string]interface{}{ - { - "add_fields": map[string]interface{}{ - "fields": mapping, - "target": "kubernetes", - }, + "namespace_annotations": common.MapStr{ + "nsa": "nsb", + }, + "annotations": common.MapStr{ + "app": "production", }, } + processors := map[string]interface{}{ + "container": common.MapStr{ + "id": "asdfghdeadbeef", + "image": common.MapStr{"name": "nginx:1.120"}, + "runtime": "crio", + }, "orchestrator": common.MapStr{ + "cluster": common.MapStr{ + "name": "devcluster", + "url": "8.8.8.8:9090"}, + }, "kubernetes": common.MapStr{ + "namespace": "testns", + "namespace_annotations": common.MapStr{"nsa": "nsb"}, + "pod": common.MapStr{ + "annotations": common.MapStr{"app": "production"}, + "ip": "127.0.0.5", + "labels": common.MapStr{"foo": "bar"}, + "name": "testpod", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133"}}, + } cuid := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), "nginx") data := <-providerDataChan assert.Equal(t, cuid, data.uid) assert.Equal(t, mapping, data.mapping) - assert.Equal(t, processors, data.processors) + for _, v := range data.processors { + k := v["add_fields"].(map[string]interface{}) + target := k["target"].(string) + fields := k["fields"] + assert.Equal(t, processors[target], fields) + } } @@ -258,3 +308,61 @@ func (t *MockDynamicComm) AddOrUpdate(id string, priority int, mapping map[strin // Remove func (t *MockDynamicComm) Remove(id string) { } + +type podMeta struct{} + +// Generate generates pod metadata from a resource object +// Metadata map is in the following form: +// { +// "kubernetes": {}, +// "some.ecs.field": "asdf" +// } +// All Kubernetes fields that need to be stored under kubernetes. prefix are populated by +// GenerateK8s method while fields that are part of ECS are generated by GenerateECS method +func (p *podMeta) Generate(obj kubernetes.Resource, opts ...metadata.FieldOptions) common.MapStr { + ecsFields := p.GenerateECS(obj) + meta := common.MapStr{ + "kubernetes": p.GenerateK8s(obj, opts...), + } + meta.DeepUpdate(ecsFields) + return meta +} + +// GenerateECS generates pod ECS metadata from a resource object +func (p *podMeta) GenerateECS(obj kubernetes.Resource) common.MapStr { + return common.MapStr{ + "orchestrator": common.MapStr{ + "cluster": common.MapStr{ + "name": "devcluster", + "url": "8.8.8.8:9090", + }, + }, + } +} + +// GenerateK8s generates pod metadata from a resource object +func (p *podMeta) GenerateK8s(obj kubernetes.Resource, opts ...metadata.FieldOptions) common.MapStr { + k8sPod := obj.(*kubernetes.Pod) + return common.MapStr{ + "namespace": k8sPod.GetNamespace(), + "pod": common.MapStr{ + "uid": string(k8sPod.GetUID()), + "name": k8sPod.GetName(), + "labels": common.MapStr{ + "foo": "bar", + }, + "annotations": common.MapStr{ + "app": "production", + }, + "ip": k8sPod.Status.PodIP, + }, + "namespace_annotations": common.MapStr{ + "nsa": "nsb", + }, + } +} + +// GenerateFromName generates pod metadata from a node name +func (p *podMeta) GenerateFromName(name string, opts ...metadata.FieldOptions) common.MapStr { + return nil +} diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service_test.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service_test.go index c52a1069728e..ae29d90fe6df 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service_test.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service_test.go @@ -7,6 +7,8 @@ package kubernetes import ( "testing" + "github.com/elastic/beats/v7/libbeat/common/kubernetes/metadata" + "github.com/elastic/beats/v7/libbeat/common" "github.com/stretchr/testify/assert" @@ -45,10 +47,10 @@ func TestGenerateServiceData(t *testing.T) { }, } - data := generateServiceData(service, &Config{}) + data := generateServiceData(service, &Config{}, &svcMeta{}) mapping := map[string]interface{}{ - "service": map[string]interface{}{ + "service": common.MapStr{ "uid": string(service.GetUID()), "name": service.GetName(), "labels": common.MapStr{ @@ -59,18 +61,91 @@ func TestGenerateServiceData(t *testing.T) { }, "ip": service.Spec.ClusterIP, }, + "annotations": common.MapStr{ + "baz": "ban", + }, } - processors := []map[string]interface{}{ - { - "add_fields": map[string]interface{}{ - "fields": mapping, - "target": "kubernetes", + processors := map[string]interface{}{ + "orchestrator": common.MapStr{ + "cluster": common.MapStr{ + "name": "devcluster", + "url": "8.8.8.8:9090"}, + }, "kubernetes": common.MapStr{ + "service": common.MapStr{ + "uid": string(service.GetUID()), + "name": service.GetName(), + "labels": common.MapStr{ + "foo": "bar", + }, + "annotations": common.MapStr{ + "baz": "ban", + }, + "ip": "1.2.3.4", }, }, } assert.Equal(t, service, data.service) assert.Equal(t, mapping, data.mapping) - assert.Equal(t, processors, data.processors) + for _, v := range data.processors { + k := v["add_fields"].(map[string]interface{}) + target := k["target"].(string) + fields := k["fields"] + assert.Equal(t, processors[target], fields) + } +} + +type svcMeta struct{} + +// Generate generates svc metadata from a resource object +// Metadata map is in the following form: +// { +// "kubernetes": {}, +// "some.ecs.field": "asdf" +// } +// All Kubernetes fields that need to be stored under kubernetes. prefix are populated by +// GenerateK8s method while fields that are part of ECS are generated by GenerateECS method +func (s *svcMeta) Generate(obj kubernetes.Resource, opts ...metadata.FieldOptions) common.MapStr { + ecsFields := s.GenerateECS(obj) + meta := common.MapStr{ + "kubernetes": s.GenerateK8s(obj, opts...), + } + meta.DeepUpdate(ecsFields) + return meta +} + +// GenerateECS generates svc ECS metadata from a resource object +func (s *svcMeta) GenerateECS(obj kubernetes.Resource) common.MapStr { + return common.MapStr{ + "orchestrator": common.MapStr{ + "cluster": common.MapStr{ + "name": "devcluster", + "url": "8.8.8.8:9090", + }, + }, + } +} + +// GenerateK8s generates svc metadata from a resource object +func (s *svcMeta) GenerateK8s(obj kubernetes.Resource, opts ...metadata.FieldOptions) common.MapStr { + k8sNode := obj.(*kubernetes.Service) + return common.MapStr{ + "service": common.MapStr{ + "uid": string(k8sNode.GetUID()), + "name": k8sNode.GetName(), + "labels": common.MapStr{ + "foo": "bar", + }, + "annotations": common.MapStr{ + "baz": "ban", + }, + "ip": "1.2.3.4", + }, + } +} + +// GenerateFromName generates svc metadata from a node name +func (s *svcMeta) GenerateFromName(name string, opts ...metadata.FieldOptions) common.MapStr { + return nil } From e2fec8275e116e0046438bfa0f7aa9c1ddb1a266 Mon Sep 17 00:00:00 2001 From: chrismark Date: Mon, 13 Sep 2021 13:02:03 +0300 Subject: [PATCH 08/16] Add changelog Signed-off-by: chrismark --- x-pack/elastic-agent/CHANGELOG.next.asciidoc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/x-pack/elastic-agent/CHANGELOG.next.asciidoc b/x-pack/elastic-agent/CHANGELOG.next.asciidoc index 367c8059a372..2ebee0ff9ee9 100644 --- a/x-pack/elastic-agent/CHANGELOG.next.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.next.asciidoc @@ -136,4 +136,5 @@ - Add new --enroll-delay option for install and enroll commands. {pull}27118[27118] - Add link to troubleshooting guide on fatal exits. {issue}26367[26367] {pull}27236[27236] - Agent now adapts the beats queue size based on output settings. {issue}26638[26638] {pull}27429[27429] -- Support ephemeral containers in Kubernetes dynamic provider. {issue}#27020[#27020] {pull}27707[27707] +- Support ephemeral containers in Kubernetes dynamic provider. {issue}27020[#27020] {pull}27707[27707] +- Add complete k8s metadata through composable provider. {pull}27691[27691] From 848c275cafb1b1ac8a26f58bb6eb0f266cadc3d3 Mon Sep 17 00:00:00 2001 From: chrismark Date: Mon, 13 Sep 2021 15:55:18 +0300 Subject: [PATCH 09/16] Handle pod termination better Signed-off-by: chrismark --- .../composable/providers/kubernetes/pod.go | 81 +++++++++++++++++-- 1 file changed, 76 insertions(+), 5 deletions(-) diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go index b7ca066996ec..652dd82da683 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go @@ -42,6 +42,13 @@ type providerData struct { processors []map[string]interface{} } +type containerInPod struct { + id string + runtime string + spec kubernetes.Container + status kubernetes.PodContainerStatus +} + // podUpdaterHandlerFunc is a function that handles pod updater notifications. type podUpdaterHandlerFunc func(interface{}) @@ -187,13 +194,10 @@ func (p *pod) unlockedUpdate(obj interface{}) { pod := obj.(*kubernetes.Pod) p.logger.Debugf("pod update for pod: %+v, status: %+v", pod.Name, pod.Status.Phase) - switch pod.Status.Phase { - case kubernetes.PodSucceeded, kubernetes.PodFailed: + + if podTerminated(pod, getContainersInPod(pod)) { time.AfterFunc(p.cleanupTimeout, func() { p.emitStopped(pod) }) return - case kubernetes.PodPending: - p.logger.Debugf("pod update (pending): don't know what to do with this pod yet, skipping for now: %+v", obj) - return } p.logger.Debugf("pod update: %+v", obj) @@ -431,3 +435,70 @@ func (*namespacePodUpdater) OnAdd(interface{}) {} // OnDelete handles delete events on namespaces. Nothing to do, if pods are deleted from this // namespace they will generate their own delete events. func (*namespacePodUpdater) OnDelete(interface{}) {} + +// podTerminating returns true if a pod is marked for deletion or is in a phase beyond running. +func podTerminating(pod *kubernetes.Pod) bool { + if pod.GetObjectMeta().GetDeletionTimestamp() != nil { + return true + } + + switch pod.Status.Phase { + case kubernetes.PodRunning, kubernetes.PodPending: + default: + return true + } + + return false +} + +// podTerminated returns true if a pod is terminated, this method considers a +// pod as terminated if none of its containers are running (or going to be running). +func podTerminated(pod *kubernetes.Pod, containers []*containerInPod) bool { + // Pod is not marked for termination, so it is not terminated. + if !podTerminating(pod) { + return false + } + + // If any container is running, the pod is not terminated yet. + for _, container := range containers { + if container.status.State.Running != nil { + return false + } + } + + return true +} + +// getContainersInPod returns all the containers defined in a pod and their statuses. +// It includes init and ephemeral containers. +func getContainersInPod(pod *kubernetes.Pod) []*containerInPod { + var containers []*containerInPod + for _, c := range pod.Spec.Containers { + containers = append(containers, &containerInPod{spec: c}) + } + for _, c := range pod.Spec.InitContainers { + containers = append(containers, &containerInPod{spec: c}) + } + for _, c := range pod.Spec.EphemeralContainers { + c := kubernetes.Container(c.EphemeralContainerCommon) + containers = append(containers, &containerInPod{spec: c}) + } + + statuses := make(map[string]*kubernetes.PodContainerStatus) + mapStatuses := func(s []kubernetes.PodContainerStatus) { + for i := range s { + statuses[s[i].Name] = &s[i] + } + } + mapStatuses(pod.Status.ContainerStatuses) + mapStatuses(pod.Status.InitContainerStatuses) + mapStatuses(pod.Status.EphemeralContainerStatuses) + for _, c := range containers { + if s, ok := statuses[c.spec.Name]; ok { + c.id, c.runtime = kubernetes.ContainerIDWithRuntime(*s) + c.status = *s + } + } + + return containers +} From 818e2963be125bae3c64aa283254fd03f52a25c7 Mon Sep 17 00:00:00 2001 From: chrismark Date: Tue, 14 Sep 2021 12:54:57 +0300 Subject: [PATCH 10/16] Code improvements Signed-off-by: chrismark --- libbeat/cmd/instance/locker.go | 2 +- .../providers/kubernetes/kubernetes.go | 47 ++++++------ .../composable/providers/kubernetes/node.go | 12 ++-- .../composable/providers/kubernetes/pod.go | 71 ++++++------------- .../providers/kubernetes/pod_test.go | 50 ++++++------- .../providers/kubernetes/service.go | 9 ++- 6 files changed, 82 insertions(+), 109 deletions(-) diff --git a/libbeat/cmd/instance/locker.go b/libbeat/cmd/instance/locker.go index d8a24a02423f..ebf14c43ac4e 100644 --- a/libbeat/cmd/instance/locker.go +++ b/libbeat/cmd/instance/locker.go @@ -44,7 +44,7 @@ func newLocker(b *Beat) *locker { } } -// lock attemps to acquire a lock on the data path for the currently-running +// lock attempts to acquire a lock on the data path for the currently-running // Beat instance. If another Beats instance already has a lock on the same data path // an ErrAlreadyLocked error is returned. func (l *locker) lock() error { diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go index ebf14723c457..d71698770033 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go @@ -9,8 +9,6 @@ import ( k8s "k8s.io/client-go/kubernetes" - "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/common/kubernetes" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable" @@ -34,9 +32,8 @@ func init() { } type dynamicProvider struct { - logger *logger.Logger - config *Config - rawConfig *common.Config + logger *logger.Logger + config *Config } // DynamicProviderBuilder builds the dynamic provider. @@ -49,29 +46,26 @@ func DynamicProviderBuilder(logger *logger.Logger, c *config.Config) (composable if err != nil { return nil, errors.New(err, "failed to unpack configuration") } - rawConfig, err := common.NewConfigFrom(c) - if err != nil { - return nil, errors.New(err, "failed to unpack configuration") - } - return &dynamicProvider{logger, &cfg, rawConfig}, nil + + return &dynamicProvider{logger, &cfg}, nil } // Run runs the kubernetes context provider. func (p *dynamicProvider) Run(comm composable.DynamicProviderComm) error { if p.config.Resources.Pod.Enabled { - err := p.watchResource(comm, "pod", p.config) + err := p.watchResource(comm, "pod") if err != nil { return err } } if p.config.Resources.Node.Enabled { - err := p.watchResource(comm, "node", p.config) + err := p.watchResource(comm, "node") if err != nil { return err } } if p.config.Resources.Service.Enabled { - err := p.watchResource(comm, "service", p.config) + err := p.watchResource(comm, "service") if err != nil { return err } @@ -83,9 +77,8 @@ func (p *dynamicProvider) Run(comm composable.DynamicProviderComm) error { // and starts watching for such resource's events. func (p *dynamicProvider) watchResource( comm composable.DynamicProviderComm, - resourceType string, - config *Config) error { - client, err := kubernetes.GetKubernetesClient(config.KubeConfig) + resourceType string) error { + client, err := kubernetes.GetKubernetesClient(p.config.KubeConfig) if err != nil { // info only; return nil (do nothing) p.logger.Debugf("Kubernetes provider for resource %s skipped, unable to connect: %s", resourceType, err) @@ -100,24 +93,24 @@ func (p *dynamicProvider) watchResource( p.logger.Debugf( "Initializing Kubernetes watcher for resource %s using node: %v", resourceType, - config.Node) + p.config.Node) nd := &kubernetes.DiscoverKubernetesNodeParams{ - ConfigHost: config.Node, + ConfigHost: p.config.Node, Client: client, - IsInCluster: kubernetes.IsInCluster(config.KubeConfig), + IsInCluster: kubernetes.IsInCluster(p.config.KubeConfig), HostUtils: &kubernetes.DefaultDiscoveryUtils{}, } - config.Node, err = kubernetes.DiscoverKubernetesNode(p.logger, nd) + p.config.Node, err = kubernetes.DiscoverKubernetesNode(p.logger, nd) if err != nil { p.logger.Debugf("Kubernetes provider skipped, unable to discover node: %w", err) return nil } } else { - config.Node = "" + p.config.Node = "" } - watcher, err := p.newWatcher(resourceType, comm, client, config) + watcher, err := p.newWatcher(resourceType, comm, client) if err != nil { return errors.New(err, "couldn't create kubernetes watcher for resource %s", resourceType) } @@ -133,23 +126,23 @@ func (p *dynamicProvider) watchResource( func (p *dynamicProvider) newWatcher( resourceType string, comm composable.DynamicProviderComm, - client k8s.Interface, - config *Config) (kubernetes.Watcher, error) { + client k8s.Interface) (kubernetes.Watcher, error) { + //rawConfig, err := common.NewConfigFrom(c) switch resourceType { case "pod": - watcher, err := NewPodWatcher(comm, config, p.logger, client, p.config.Scope, p.rawConfig) + watcher, err := NewPodWatcher(comm, p.config, p.logger, client, p.config.Scope) if err != nil { return nil, err } return watcher, nil case "node": - watcher, err := NewNodeWatcher(comm, config, p.logger, client, p.config.Scope, p.rawConfig) + watcher, err := NewNodeWatcher(comm, p.config, p.logger, client, p.config.Scope) if err != nil { return nil, err } return watcher, nil case "service": - watcher, err := NewServiceWatcher(comm, config, p.logger, client, p.config.Scope, p.rawConfig) + watcher, err := NewServiceWatcher(comm, p.config, p.logger, client, p.config.Scope) if err != nil { return nil, err } diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go index 6808f96a89e2..17802735d070 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go @@ -7,15 +7,14 @@ package kubernetes import ( "time" - "github.com/elastic/beats/v7/libbeat/common/kubernetes/metadata" - "github.com/elastic/beats/v7/libbeat/common/safemapstr" - v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" k8s "k8s.io/client-go/kubernetes" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/kubernetes" + "github.com/elastic/beats/v7/libbeat/common/kubernetes/metadata" + "github.com/elastic/beats/v7/libbeat/common/safemapstr" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable" @@ -42,8 +41,7 @@ func NewNodeWatcher( cfg *Config, logger *logp.Logger, client k8s.Interface, - scope string, - rawConfig *common.Config) (kubernetes.Watcher, error) { + scope string) (kubernetes.Watcher, error) { watcher, err := kubernetes.NewWatcher(client, &kubernetes.Node{}, kubernetes.WatchOptions{ SyncTimeout: cfg.SyncPeriod, Node: cfg.Node, @@ -54,6 +52,10 @@ func NewNodeWatcher( return nil, errors.New(err, "couldn't create kubernetes watcher") } + rawConfig, err := common.NewConfigFrom(cfg) + if err != nil { + return nil, errors.New(err, "failed to unpack configuration") + } metaGen := metadata.NewNodeMetadataGenerator(rawConfig, watcher.Store(), client) watcher.AddEventHandler(&node{ logger, diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go index 652dd82da683..75453136f5cd 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go @@ -71,8 +71,7 @@ func NewPodWatcher( cfg *Config, logger *logp.Logger, client k8s.Interface, - scope string, - rawConfig *common.Config) (kubernetes.Watcher, error) { + scope string) (kubernetes.Watcher, error) { watcher, err := kubernetes.NewWatcher(client, &kubernetes.Pod{}, kubernetes.WatchOptions{ SyncTimeout: cfg.SyncPeriod, Node: cfg.Node, @@ -102,6 +101,10 @@ func NewPodWatcher( logger.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Namespace{}, err) } + rawConfig, err := common.NewConfigFrom(cfg) + if err != nil { + return nil, errors.New(err, "failed to unpack configuration") + } metaGen := metadata.GetPodMetaGen(rawConfig, watcher, nodeWatcher, namespaceWatcher, metaConf) p := pod{ @@ -136,22 +139,12 @@ func (p *pod) emitRunning(pod *kubernetes.Pod) { p.comm.AddOrUpdate(data.uid, PodPriority, data.mapping, data.processors) // Emit all containers in the pod - p.emitContainers(pod, pod.Spec.Containers, pod.Status.ContainerStatuses) - // TODO: deal with init containers stopping after initialization - p.emitContainers(pod, pod.Spec.InitContainers, pod.Status.InitContainerStatuses) - - // Get ephemeral containers and their status - ephContainers, ephContainersStatuses := getEphemeralContainers(pod) - p.emitContainers(pod, ephContainers, ephContainersStatuses) - + p.emitContainers(pod) } -func (p *pod) emitContainers( - pod *kubernetes.Pod, - containers []kubernetes.Container, - containerstatuses []kubernetes.PodContainerStatus) { - generateContainerData(p.comm, pod, containers, containerstatuses, p.config, p.metagen) +func (p *pod) emitContainers(pod *kubernetes.Pod) { + generateContainerData(p.comm, pod, p.config, p.metagen) } func (p *pod) emitStopped(pod *kubernetes.Pod) { @@ -264,18 +257,10 @@ func generatePodData( func generateContainerData( comm composable.DynamicProviderComm, pod *kubernetes.Pod, - containers []kubernetes.Container, - containerstatuses []kubernetes.PodContainerStatus, cfg *Config, kubeMetaGen metadata.MetaGen) { - containerIDs := map[string]string{} - runtimes := map[string]string{} - for _, c := range containerstatuses { - cid, runtime := kubernetes.ContainerIDWithRuntime(c) - containerIDs[c.Name] = cid - runtimes[c.Name] = runtime - } + containers := getContainersInPod(pod) // Pass annotations to all events so that it can be used in templating and by annotation builders. annotations := common.MapStr{} @@ -287,15 +272,14 @@ func generateContainerData( // If it doesn't have an ID, container doesn't exist in // the runtime, emit only an event if we are stopping, so // we are sure of cleaning up configurations. - cid := containerIDs[c.Name] - if cid == "" { + if c.id == "" { continue } // ID is the combination of pod UID + container name - eventID := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), c.Name) + eventID := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), c.spec.Name) - meta := kubeMetaGen.Generate(pod, metadata.WithFields("container.name", c.Name)) + meta := kubeMetaGen.Generate(pod, metadata.WithFields("container.name", c.spec.Name)) kubemetaMap, err := meta.GetValue("kubernetes") if err != nil { continue @@ -310,10 +294,10 @@ func generateContainerData( //container ECS fields cmeta := common.MapStr{ - "id": cid, - "runtime": runtimes[c.Name], + "id": c.id, + "runtime": c.runtime, "image": common.MapStr{ - "name": c.Image, + "name": c.spec.Image, }, } @@ -340,13 +324,13 @@ func generateContainerData( // add container metadata under kubernetes.container.* to // make them available to dynamic var resolution containerMeta := common.MapStr{ - "id": cid, - "name": c.Name, - "image": c.Image, - "runtime": runtimes[c.Name], + "id": c.id, + "name": c.spec.Name, + "image": c.spec.Image, + "runtime": c.runtime, } - if len(c.Ports) > 0 { - for _, port := range c.Ports { + if len(c.spec.Ports) > 0 { + for _, port := range c.spec.Ports { containerMeta.Put("port", fmt.Sprintf("%v", port.ContainerPort)) containerMeta.Put("port_name", port.Name) k8sMapping["container"] = containerMeta @@ -359,19 +343,6 @@ func generateContainerData( } } -func getEphemeralContainers(pod *kubernetes.Pod) ([]kubernetes.Container, []kubernetes.PodContainerStatus) { - var ephContainers []kubernetes.Container - var ephContainersStatuses []kubernetes.PodContainerStatus - for _, c := range pod.Spec.EphemeralContainers { - c := kubernetes.Container(c.EphemeralContainerCommon) - ephContainers = append(ephContainers, c) - } - for _, s := range pod.Status.EphemeralContainerStatuses { - ephContainersStatuses = append(ephContainersStatuses, s) - } - return ephContainers, ephContainersStatuses -} - // podNamespaceAnnotations returns the annotations of the namespace of the pod func podNamespaceAnnotations(pod *kubernetes.Pod, watcher kubernetes.Watcher) common.MapStr { if watcher == nil { diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod_test.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod_test.go index 7e52b7418843..3b24a7b33ab5 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod_test.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod_test.go @@ -99,6 +99,26 @@ func TestGeneratePodData(t *testing.T) { func TestGenerateContainerPodData(t *testing.T) { uid := "005f3b90-4b9d-12f8-acf0-31020a840133" + containers := []kubernetes.Container{ + { + Name: "nginx", + Image: "nginx:1.120", + Ports: []kubernetes.ContainerPort{ + { + Name: "http", + Protocol: v1.ProtocolTCP, + ContainerPort: 80, + }, + }, + }, + } + containerStatuses := []kubernetes.PodContainerStatus{ + { + Name: "nginx", + Ready: true, + ContainerID: "crio://asdfghdeadbeef", + }, + } pod := &kubernetes.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "testpod", @@ -116,33 +136,17 @@ func TestGenerateContainerPodData(t *testing.T) { APIVersion: "v1", }, Spec: kubernetes.PodSpec{ - NodeName: "testnode", + NodeName: "testnode", + Containers: containers, + }, + Status: kubernetes.PodStatus{ + PodIP: "127.0.0.5", + ContainerStatuses: containerStatuses, }, - Status: kubernetes.PodStatus{PodIP: "127.0.0.5"}, } providerDataChan := make(chan providerData, 1) - containers := []kubernetes.Container{ - { - Name: "nginx", - Image: "nginx:1.120", - Ports: []kubernetes.ContainerPort{ - { - Name: "http", - Protocol: v1.ProtocolTCP, - ContainerPort: 80, - }, - }, - }, - } - containerStatuses := []kubernetes.PodContainerStatus{ - { - Name: "nginx", - Ready: true, - ContainerID: "crio://asdfghdeadbeef", - }, - } comm := MockDynamicComm{ context.TODO(), providerDataChan, @@ -150,8 +154,6 @@ func TestGenerateContainerPodData(t *testing.T) { generateContainerData( &comm, pod, - containers, - containerStatuses, &Config{}, &podMeta{}) diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service.go index 8e0e2625d55f..0f8cdfe0edb0 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service.go @@ -41,8 +41,7 @@ func NewServiceWatcher( cfg *Config, logger *logp.Logger, client k8s.Interface, - scope string, - rawConfig *common.Config) (kubernetes.Watcher, error) { + scope string) (kubernetes.Watcher, error) { watcher, err := kubernetes.NewWatcher(client, &kubernetes.Service{}, kubernetes.WatchOptions{ SyncTimeout: cfg.SyncPeriod, Node: cfg.Node, @@ -61,6 +60,12 @@ func NewServiceWatcher( return nil, fmt.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Namespace{}, err) } namespaceMeta := metadata.NewNamespaceMetadataGenerator(metaConf.Namespace, namespaceWatcher.Store(), client) + + rawConfig, err := common.NewConfigFrom(cfg) + if err != nil { + return nil, errors.New(err, "failed to unpack configuration") + } + metaGen := metadata.NewServiceMetadataGenerator(rawConfig, watcher.Store(), namespaceMeta, client) watcher.AddEventHandler(&service{ logger, From 2699f16c24d3fb6d2b2c83d20ff5fc5f54f988e1 Mon Sep 17 00:00:00 2001 From: chrismark Date: Tue, 14 Sep 2021 14:41:25 +0300 Subject: [PATCH 11/16] Remove leftover Signed-off-by: chrismark --- .../pkg/composable/providers/kubernetes/kubernetes.go | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go index d71698770033..c43e5f984300 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go @@ -127,7 +127,6 @@ func (p *dynamicProvider) newWatcher( resourceType string, comm composable.DynamicProviderComm, client k8s.Interface) (kubernetes.Watcher, error) { - //rawConfig, err := common.NewConfigFrom(c) switch resourceType { case "pod": watcher, err := NewPodWatcher(comm, p.config, p.logger, client, p.config.Scope) From 5002ce5f99271543fec76e8464b7bc1617fc8bb7 Mon Sep 17 00:00:00 2001 From: chrismark Date: Wed, 15 Sep 2021 17:38:29 +0300 Subject: [PATCH 12/16] Small fixes Signed-off-by: chrismark --- .../pkg/composable/providers/kubernetes/pod.go | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go index 75453136f5cd..a41b2c008a15 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go @@ -185,15 +185,6 @@ func (p *pod) OnUpdate(obj interface{}) { func (p *pod) unlockedUpdate(obj interface{}) { p.logger.Debugf("Watcher Pod update: %+v", obj) pod := obj.(*kubernetes.Pod) - - p.logger.Debugf("pod update for pod: %+v, status: %+v", pod.Name, pod.Status.Phase) - - if podTerminated(pod, getContainersInPod(pod)) { - time.AfterFunc(p.cleanupTimeout, func() { p.emitStopped(pod) }) - return - } - - p.logger.Debugf("pod update: %+v", obj) p.emitRunning(pod) } @@ -204,7 +195,13 @@ func (p *pod) OnDelete(obj interface{}) { p.logger.Debugf("pod delete: %+v", obj) pod := obj.(*kubernetes.Pod) - time.AfterFunc(p.cleanupTimeout, func() { p.emitStopped(pod) }) + // TODO: verify if we need this check here + if podTerminated(pod, getContainersInPod(pod)) { + time.AfterFunc(p.cleanupTimeout, func() { + p.emitStopped(pod) + }) + return + } } func generatePodData( From acab1a11c111e680ec1ac103ad123c6f0c0bb099 Mon Sep 17 00:00:00 2001 From: chrismark Date: Wed, 15 Sep 2021 17:49:22 +0300 Subject: [PATCH 13/16] Tune tests Signed-off-by: chrismark --- .../providers/kubernetes/pod_test.go | 171 +++++++++++------- 1 file changed, 106 insertions(+), 65 deletions(-) diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod_test.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod_test.go index 3b24a7b33ab5..718b05dbd614 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod_test.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod_test.go @@ -218,77 +218,118 @@ func TestGenerateContainerPodData(t *testing.T) { } -func TestGetEphemeralContainers(t *testing.T) { - name := "filebeat" - namespace := "default" - podIP := "127.0.0.1" - containerID := "docker://foobar" +func TestEphemeralContainers(t *testing.T) { uid := "005f3b90-4b9d-12f8-acf0-31020a840133" - containerImage := "elastic/filebeat:6.3.0" - node := "node" - - expectedEphemeralContainers := - []kubernetes.Container{ - { - Name: "filebeat", - Image: "elastic/filebeat:6.3.0", - }, - } - expectedephemeralContainersStatuses := - []kubernetes.PodContainerStatus{ - { - Name: "filebeat", - State: v1.ContainerState{ - Running: &v1.ContainerStateRunning{ - StartedAt: metav1.Time{}, - }, - }, - Ready: false, - ContainerID: "docker://foobar", + containers := []v1.EphemeralContainer{ + { + EphemeralContainerCommon: v1.EphemeralContainerCommon{ + Image: "nginx:1.120", + Name: "nginx", }, - } - - pod := - &kubernetes.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - UID: types.UID(uid), - Namespace: namespace, - Labels: map[string]string{}, - Annotations: map[string]string{}, + }, + } + containerStatuses := []kubernetes.PodContainerStatus{ + { + Name: "nginx", + Ready: true, + ContainerID: "crio://asdfghdeadbeef", + }, + } + pod := &kubernetes.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testpod", + UID: types.UID(uid), + Namespace: "testns", + Labels: map[string]string{ + "foo": "bar", }, - TypeMeta: metav1.TypeMeta{ - Kind: "Pod", - APIVersion: "v1", + Annotations: map[string]string{ + "app": "production", }, - Status: v1.PodStatus{ - PodIP: podIP, - Phase: kubernetes.PodRunning, - EphemeralContainerStatuses: []kubernetes.PodContainerStatus{ - { - Name: name, - ContainerID: containerID, - State: v1.ContainerState{ - Running: &v1.ContainerStateRunning{}, - }, - }, - }, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + Spec: kubernetes.PodSpec{ + NodeName: "testnode", + EphemeralContainers: containers, + }, + Status: kubernetes.PodStatus{ + PodIP: "127.0.0.5", + EphemeralContainerStatuses: containerStatuses, + }, + } + + providerDataChan := make(chan providerData, 1) + + comm := MockDynamicComm{ + context.TODO(), + providerDataChan, + } + generateContainerData( + &comm, + pod, + &Config{}, + &podMeta{}) + + mapping := map[string]interface{}{ + "namespace": pod.GetNamespace(), + "pod": common.MapStr{ + "uid": string(pod.GetUID()), + "name": pod.GetName(), + "labels": common.MapStr{ + "foo": "bar", }, - Spec: v1.PodSpec{ - NodeName: node, - EphemeralContainers: []v1.EphemeralContainer{ - { - EphemeralContainerCommon: v1.EphemeralContainerCommon{ - Image: containerImage, - Name: name, - }, - }, - }, + "annotations": common.MapStr{ + "app": "production", }, - } - ephContainers, ephContainersStatuses := getEphemeralContainers(pod) - assert.Equal(t, expectedEphemeralContainers, ephContainers) - assert.Equal(t, expectedephemeralContainersStatuses, ephContainersStatuses) + "ip": pod.Status.PodIP, + }, + "container": common.MapStr{ + "id": "asdfghdeadbeef", + "name": "nginx", + "image": "nginx:1.120", + "runtime": "crio", + }, + "namespace_annotations": common.MapStr{ + "nsa": "nsb", + }, + "annotations": common.MapStr{ + "app": "production", + }, + } + + processors := map[string]interface{}{ + "container": common.MapStr{ + "id": "asdfghdeadbeef", + "image": common.MapStr{"name": "nginx:1.120"}, + "runtime": "crio", + }, "orchestrator": common.MapStr{ + "cluster": common.MapStr{ + "name": "devcluster", + "url": "8.8.8.8:9090"}, + }, "kubernetes": common.MapStr{ + "namespace": "testns", + "namespace_annotations": common.MapStr{"nsa": "nsb"}, + "pod": common.MapStr{ + "annotations": common.MapStr{"app": "production"}, + "ip": "127.0.0.5", + "labels": common.MapStr{"foo": "bar"}, + "name": "testpod", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133"}}, + } + cuid := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), "nginx") + data := <-providerDataChan + assert.Equal(t, cuid, data.uid) + assert.Equal(t, mapping, data.mapping) + for _, v := range data.processors { + k := v["add_fields"].(map[string]interface{}) + target := k["target"].(string) + fields := k["fields"] + assert.Equal(t, processors[target], fields) + } + } // MockDynamicComm is used in tests. From 48bc841d791dd97ec6708d7e01708cbe2aaf013e Mon Sep 17 00:00:00 2001 From: chrismark Date: Thu, 16 Sep 2021 12:00:34 +0300 Subject: [PATCH 14/16] Fix tests and label handling Signed-off-by: chrismark --- .../providers/kubernetes/node_test.go | 25 +++---- .../composable/providers/kubernetes/pod.go | 15 +++-- .../providers/kubernetes/pod_test.go | 67 +++++++++---------- .../providers/kubernetes/service.go | 59 ++++++++++++---- .../providers/kubernetes/service_test.go | 44 ++++++------ 5 files changed, 124 insertions(+), 86 deletions(-) diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node_test.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node_test.go index cbedc6e9c699..8fd0459876ff 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node_test.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node_test.go @@ -49,17 +49,14 @@ func TestGenerateNodeData(t *testing.T) { "node": common.MapStr{ "uid": string(node.GetUID()), "name": node.GetName(), - "labels": common.MapStr{ - "foo": "bar", - }, - "annotations": common.MapStr{ - "baz": "ban", - }, "ip": "node1", }, "annotations": common.MapStr{ "baz": "ban", }, + "labels": common.MapStr{ + "foo": "bar", + }, } processors := map[string]interface{}{ @@ -68,10 +65,10 @@ func TestGenerateNodeData(t *testing.T) { "name": "devcluster", "url": "8.8.8.8:9090"}, }, "kubernetes": common.MapStr{ + "labels": common.MapStr{"foo": "bar"}, + "annotations": common.MapStr{"baz": "ban"}, "node": common.MapStr{ - "annotations": common.MapStr{"baz": "ban"}, "ip": "node1", - "labels": common.MapStr{"foo": "bar"}, "name": "testnode", "uid": "005f3b90-4b9d-12f8-acf0-31020a840133"}, }, @@ -124,14 +121,14 @@ func (n *nodeMeta) GenerateK8s(obj kubernetes.Resource, opts ...metadata.FieldOp "node": common.MapStr{ "uid": string(k8sNode.GetUID()), "name": k8sNode.GetName(), - "labels": common.MapStr{ - "foo": "bar", - }, - "annotations": common.MapStr{ - "baz": "ban", - }, "ip": "node1", }, + "labels": common.MapStr{ + "foo": "bar", + }, + "annotations": common.MapStr{ + "baz": "ban", + }, } } diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go index a41b2c008a15..bd0364f5797a 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go @@ -140,11 +140,11 @@ func (p *pod) emitRunning(pod *kubernetes.Pod) { // Emit all containers in the pod // TODO: deal with init containers stopping after initialization - p.emitContainers(pod) + p.emitContainers(pod, namespaceAnnotations) } -func (p *pod) emitContainers(pod *kubernetes.Pod) { - generateContainerData(p.comm, pod, p.config, p.metagen) +func (p *pod) emitContainers(pod *kubernetes.Pod, namespaceAnnotations common.MapStr) { + generateContainerData(p.comm, pod, p.config, p.metagen, namespaceAnnotations) } func (p *pod) emitStopped(pod *kubernetes.Pod) { @@ -221,6 +221,7 @@ func generatePodData( k8sMapping := map[string]interface{}(kubemetaMap.(common.MapStr).Clone()) if len(namespaceAnnotations) != 0 { + // TODO: convert it to namespace.annotations for 8.0 k8sMapping["namespace_annotations"] = namespaceAnnotations } @@ -255,7 +256,8 @@ func generateContainerData( comm composable.DynamicProviderComm, pod *kubernetes.Pod, cfg *Config, - kubeMetaGen metadata.MetaGen) { + kubeMetaGen metadata.MetaGen, + namespaceAnnotations common.MapStr) { containers := getContainersInPod(pod) @@ -286,6 +288,11 @@ func generateContainerData( // and these are available as dynamic vars through the provider k8sMapping := map[string]interface{}(kubemetaMap.(common.MapStr).Clone()) + if len(namespaceAnnotations) != 0 { + // TODO: convert it to namespace.annotations for 8.0 + k8sMapping["namespace_annotations"] = namespaceAnnotations + } + // add annotations to be discoverable by templates k8sMapping["annotations"] = annotations diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod_test.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod_test.go index 718b05dbd614..eb6bd216f657 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod_test.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod_test.go @@ -56,17 +56,14 @@ func TestGeneratePodData(t *testing.T) { "pod": common.MapStr{ "uid": string(pod.GetUID()), "name": pod.GetName(), - "labels": common.MapStr{ - "foo": "bar", - }, - "annotations": common.MapStr{ - "app": "production", - }, "ip": pod.Status.PodIP, }, "namespace_annotations": common.MapStr{ "nsa": "nsb", }, + "labels": common.MapStr{ + "foo": "bar", + }, "annotations": common.MapStr{ "app": "production", }, @@ -79,11 +76,12 @@ func TestGeneratePodData(t *testing.T) { "url": "8.8.8.8:9090"}, }, "kubernetes": common.MapStr{ "namespace": "testns", - "namespace_annotations": common.MapStr{"nsa": "nsb"}, + "labels": common.MapStr{ + "foo": "bar", + }, + "annotations": common.MapStr{"app": "production"}, "pod": common.MapStr{ - "annotations": common.MapStr{"app": "production"}, "ip": "127.0.0.5", - "labels": common.MapStr{"foo": "bar"}, "name": "testpod", "uid": "005f3b90-4b9d-12f8-acf0-31020a840133"}}, } @@ -155,19 +153,16 @@ func TestGenerateContainerPodData(t *testing.T) { &comm, pod, &Config{}, - &podMeta{}) + &podMeta{}, + common.MapStr{ + "nsa": "nsb", + },) mapping := map[string]interface{}{ "namespace": pod.GetNamespace(), "pod": common.MapStr{ "uid": string(pod.GetUID()), "name": pod.GetName(), - "labels": common.MapStr{ - "foo": "bar", - }, - "annotations": common.MapStr{ - "app": "production", - }, "ip": pod.Status.PodIP, }, "container": common.MapStr{ @@ -184,6 +179,9 @@ func TestGenerateContainerPodData(t *testing.T) { "annotations": common.MapStr{ "app": "production", }, + "labels": common.MapStr{ + "foo": "bar", + }, } processors := map[string]interface{}{ @@ -197,11 +195,10 @@ func TestGenerateContainerPodData(t *testing.T) { "url": "8.8.8.8:9090"}, }, "kubernetes": common.MapStr{ "namespace": "testns", - "namespace_annotations": common.MapStr{"nsa": "nsb"}, + "annotations": common.MapStr{"app": "production"}, + "labels": common.MapStr{"foo": "bar"}, "pod": common.MapStr{ - "annotations": common.MapStr{"app": "production"}, "ip": "127.0.0.5", - "labels": common.MapStr{"foo": "bar"}, "name": "testpod", "uid": "005f3b90-4b9d-12f8-acf0-31020a840133"}}, } @@ -271,21 +268,21 @@ func TestEphemeralContainers(t *testing.T) { &comm, pod, &Config{}, - &podMeta{}) + &podMeta{}, + common.MapStr{ + "nsa": "nsb", + },) mapping := map[string]interface{}{ "namespace": pod.GetNamespace(), "pod": common.MapStr{ "uid": string(pod.GetUID()), "name": pod.GetName(), - "labels": common.MapStr{ - "foo": "bar", - }, - "annotations": common.MapStr{ - "app": "production", - }, "ip": pod.Status.PodIP, }, + "labels": common.MapStr{ + "foo": "bar", + }, "container": common.MapStr{ "id": "asdfghdeadbeef", "name": "nginx", @@ -311,11 +308,10 @@ func TestEphemeralContainers(t *testing.T) { "url": "8.8.8.8:9090"}, }, "kubernetes": common.MapStr{ "namespace": "testns", - "namespace_annotations": common.MapStr{"nsa": "nsb"}, + "labels": common.MapStr{"foo": "bar"}, + "annotations": common.MapStr{"app": "production"}, "pod": common.MapStr{ - "annotations": common.MapStr{"app": "production"}, "ip": "127.0.0.5", - "labels": common.MapStr{"foo": "bar"}, "name": "testpod", "uid": "005f3b90-4b9d-12f8-acf0-31020a840133"}}, } @@ -391,16 +387,13 @@ func (p *podMeta) GenerateK8s(obj kubernetes.Resource, opts ...metadata.FieldOpt "pod": common.MapStr{ "uid": string(k8sPod.GetUID()), "name": k8sPod.GetName(), - "labels": common.MapStr{ - "foo": "bar", - }, - "annotations": common.MapStr{ - "app": "production", - }, "ip": k8sPod.Status.PodIP, }, - "namespace_annotations": common.MapStr{ - "nsa": "nsb", + "labels": common.MapStr{ + "foo": "bar", + }, + "annotations": common.MapStr{ + "app": "production", }, } } diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service.go index 0f8cdfe0edb0..03686ca70455 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service.go @@ -21,12 +21,13 @@ import ( ) type service struct { - logger *logp.Logger - cleanupTimeout time.Duration - comm composable.DynamicProviderComm - scope string - config *Config - metagen metadata.MetaGen + logger *logp.Logger + cleanupTimeout time.Duration + comm composable.DynamicProviderComm + scope string + config *Config + metagen metadata.MetaGen + namespaceWatcher kubernetes.Watcher } type serviceData struct { @@ -74,13 +75,15 @@ func NewServiceWatcher( scope, cfg, metaGen, + namespaceWatcher, }) return watcher, nil } func (s *service) emitRunning(service *kubernetes.Service) { - data := generateServiceData(service, s.config, s.metagen) + namespaceAnnotations := svcNamespaceAnnotations(service, s.namespaceWatcher) + data := generateServiceData(service, s.config, s.metagen, namespaceAnnotations) if data == nil { return } @@ -90,6 +93,29 @@ func (s *service) emitRunning(service *kubernetes.Service) { s.comm.AddOrUpdate(string(service.GetUID()), ServicePriority, data.mapping, data.processors) } +// svcNamespaceAnnotations returns the annotations of the namespace of the service +func svcNamespaceAnnotations(svc *kubernetes.Service, watcher kubernetes.Watcher) common.MapStr { + if watcher == nil { + return nil + } + + rawNs, ok, err := watcher.Store().GetByKey(svc.Namespace) + if !ok || err != nil { + return nil + } + + namespace, ok := rawNs.(*kubernetes.Namespace) + if !ok { + return nil + } + + annotations := common.MapStr{} + for k, v := range namespace.GetAnnotations() { + safemapstr.Put(annotations, k, v) + } + return annotations +} + func (s *service) emitStopped(service *kubernetes.Service) { s.comm.Remove(string(service.GetUID())) } @@ -120,7 +146,11 @@ func (s *service) OnDelete(obj interface{}) { time.AfterFunc(s.cleanupTimeout, func() { s.emitStopped(service) }) } -func generateServiceData(service *kubernetes.Service, cfg *Config, kubeMetaGen metadata.MetaGen) *serviceData { +func generateServiceData( + service *kubernetes.Service, + cfg *Config, + kubeMetaGen metadata.MetaGen, + namespaceAnnotations common.MapStr) *serviceData { host := service.Spec.ClusterIP // If a service doesn't have an IP then dont monitor it @@ -134,16 +164,21 @@ func generateServiceData(service *kubernetes.Service, cfg *Config, kubeMetaGen m return &serviceData{} } + // k8sMapping includes only the metadata that fall under kubernetes.* + // and these are available as dynamic vars through the provider + k8sMapping := map[string]interface{}(kubemetaMap.(common.MapStr).Clone()) + + if len(namespaceAnnotations) != 0 { + // TODO: convert it to namespace.annotations for 8.0 + k8sMapping["namespace_annotations"] = namespaceAnnotations + } + // Pass annotations to all events so that it can be used in templating and by annotation builders. annotations := common.MapStr{} for k, v := range service.GetObjectMeta().GetAnnotations() { safemapstr.Put(annotations, k, v) } - // k8sMapping includes only the metadata that fall under kubernetes.* - // and these are available as dynamic vars through the provider - k8sMapping := map[string]interface{}(kubemetaMap.(common.MapStr).Clone()) - // add annotations to be discoverable by templates k8sMapping["annotations"] = annotations diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service_test.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service_test.go index ae29d90fe6df..cab6843c061c 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service_test.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service_test.go @@ -47,23 +47,29 @@ func TestGenerateServiceData(t *testing.T) { }, } - data := generateServiceData(service, &Config{}, &svcMeta{}) + data := generateServiceData( + service, + &Config{}, + &svcMeta{}, + common.MapStr{ + "nsa": "nsb", + },) mapping := map[string]interface{}{ "service": common.MapStr{ "uid": string(service.GetUID()), "name": service.GetName(), - "labels": common.MapStr{ - "foo": "bar", - }, - "annotations": common.MapStr{ - "baz": "ban", - }, "ip": service.Spec.ClusterIP, }, + "namespace_annotations": common.MapStr{ + "nsa": "nsb", + }, "annotations": common.MapStr{ "baz": "ban", }, + "labels": common.MapStr{ + "foo": "bar", + }, } processors := map[string]interface{}{ @@ -75,14 +81,14 @@ func TestGenerateServiceData(t *testing.T) { "service": common.MapStr{ "uid": string(service.GetUID()), "name": service.GetName(), - "labels": common.MapStr{ - "foo": "bar", - }, - "annotations": common.MapStr{ - "baz": "ban", - }, "ip": "1.2.3.4", }, + "labels": common.MapStr{ + "foo": "bar", + }, + "annotations": common.MapStr{ + "baz": "ban", + }, }, } @@ -134,14 +140,14 @@ func (s *svcMeta) GenerateK8s(obj kubernetes.Resource, opts ...metadata.FieldOpt "service": common.MapStr{ "uid": string(k8sNode.GetUID()), "name": k8sNode.GetName(), - "labels": common.MapStr{ - "foo": "bar", - }, - "annotations": common.MapStr{ - "baz": "ban", - }, "ip": "1.2.3.4", }, + "labels": common.MapStr{ + "foo": "bar", + }, + "annotations": common.MapStr{ + "baz": "ban", + }, } } From aaa4b0e9f90566bb474d7f8008a1a241116486ee Mon Sep 17 00:00:00 2001 From: chrismark Date: Thu, 16 Sep 2021 15:57:53 +0300 Subject: [PATCH 15/16] Lint Signed-off-by: chrismark --- .../providers/kubernetes/node_test.go | 10 ++--- .../providers/kubernetes/pod_test.go | 42 +++++++++---------- .../providers/kubernetes/service_test.go | 8 ++-- 3 files changed, 29 insertions(+), 31 deletions(-) diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node_test.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node_test.go index 8fd0459876ff..f42af49f9881 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node_test.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node_test.go @@ -49,7 +49,7 @@ func TestGenerateNodeData(t *testing.T) { "node": common.MapStr{ "uid": string(node.GetUID()), "name": node.GetName(), - "ip": "node1", + "ip": "node1", }, "annotations": common.MapStr{ "baz": "ban", @@ -68,9 +68,9 @@ func TestGenerateNodeData(t *testing.T) { "labels": common.MapStr{"foo": "bar"}, "annotations": common.MapStr{"baz": "ban"}, "node": common.MapStr{ - "ip": "node1", - "name": "testnode", - "uid": "005f3b90-4b9d-12f8-acf0-31020a840133"}, + "ip": "node1", + "name": "testnode", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133"}, }, } assert.Equal(t, node, data.node) @@ -121,7 +121,7 @@ func (n *nodeMeta) GenerateK8s(obj kubernetes.Resource, opts ...metadata.FieldOp "node": common.MapStr{ "uid": string(k8sNode.GetUID()), "name": k8sNode.GetName(), - "ip": "node1", + "ip": "node1", }, "labels": common.MapStr{ "foo": "bar", diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod_test.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod_test.go index eb6bd216f657..62d4a199892e 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod_test.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod_test.go @@ -9,17 +9,15 @@ import ( "fmt" "testing" - "github.com/elastic/beats/v7/libbeat/common/kubernetes/metadata" - - "github.com/elastic/beats/v7/libbeat/common" - "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/kubernetes" + "github.com/elastic/beats/v7/libbeat/common/kubernetes/metadata" ) func TestGeneratePodData(t *testing.T) { @@ -56,7 +54,7 @@ func TestGeneratePodData(t *testing.T) { "pod": common.MapStr{ "uid": string(pod.GetUID()), "name": pod.GetName(), - "ip": pod.Status.PodIP, + "ip": pod.Status.PodIP, }, "namespace_annotations": common.MapStr{ "nsa": "nsb", @@ -75,15 +73,15 @@ func TestGeneratePodData(t *testing.T) { "name": "devcluster", "url": "8.8.8.8:9090"}, }, "kubernetes": common.MapStr{ - "namespace": "testns", + "namespace": "testns", "labels": common.MapStr{ "foo": "bar", }, "annotations": common.MapStr{"app": "production"}, "pod": common.MapStr{ - "ip": "127.0.0.5", - "name": "testpod", - "uid": "005f3b90-4b9d-12f8-acf0-31020a840133"}}, + "ip": "127.0.0.5", + "name": "testpod", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133"}}, } assert.Equal(t, string(pod.GetUID()), data.uid) assert.Equal(t, mapping, data.mapping) @@ -156,14 +154,14 @@ func TestGenerateContainerPodData(t *testing.T) { &podMeta{}, common.MapStr{ "nsa": "nsb", - },) + }) mapping := map[string]interface{}{ "namespace": pod.GetNamespace(), "pod": common.MapStr{ "uid": string(pod.GetUID()), "name": pod.GetName(), - "ip": pod.Status.PodIP, + "ip": pod.Status.PodIP, }, "container": common.MapStr{ "id": "asdfghdeadbeef", @@ -194,13 +192,13 @@ func TestGenerateContainerPodData(t *testing.T) { "name": "devcluster", "url": "8.8.8.8:9090"}, }, "kubernetes": common.MapStr{ - "namespace": "testns", + "namespace": "testns", "annotations": common.MapStr{"app": "production"}, "labels": common.MapStr{"foo": "bar"}, "pod": common.MapStr{ - "ip": "127.0.0.5", - "name": "testpod", - "uid": "005f3b90-4b9d-12f8-acf0-31020a840133"}}, + "ip": "127.0.0.5", + "name": "testpod", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133"}}, } cuid := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), "nginx") data := <-providerDataChan @@ -271,14 +269,14 @@ func TestEphemeralContainers(t *testing.T) { &podMeta{}, common.MapStr{ "nsa": "nsb", - },) + }) mapping := map[string]interface{}{ "namespace": pod.GetNamespace(), "pod": common.MapStr{ "uid": string(pod.GetUID()), "name": pod.GetName(), - "ip": pod.Status.PodIP, + "ip": pod.Status.PodIP, }, "labels": common.MapStr{ "foo": "bar", @@ -307,13 +305,13 @@ func TestEphemeralContainers(t *testing.T) { "name": "devcluster", "url": "8.8.8.8:9090"}, }, "kubernetes": common.MapStr{ - "namespace": "testns", + "namespace": "testns", "labels": common.MapStr{"foo": "bar"}, "annotations": common.MapStr{"app": "production"}, "pod": common.MapStr{ - "ip": "127.0.0.5", - "name": "testpod", - "uid": "005f3b90-4b9d-12f8-acf0-31020a840133"}}, + "ip": "127.0.0.5", + "name": "testpod", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133"}}, } cuid := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), "nginx") data := <-providerDataChan @@ -387,7 +385,7 @@ func (p *podMeta) GenerateK8s(obj kubernetes.Resource, opts ...metadata.FieldOpt "pod": common.MapStr{ "uid": string(k8sPod.GetUID()), "name": k8sPod.GetName(), - "ip": k8sPod.Status.PodIP, + "ip": k8sPod.Status.PodIP, }, "labels": common.MapStr{ "foo": "bar", diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service_test.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service_test.go index cab6843c061c..c183541e6a73 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service_test.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service_test.go @@ -53,13 +53,13 @@ func TestGenerateServiceData(t *testing.T) { &svcMeta{}, common.MapStr{ "nsa": "nsb", - },) + }) mapping := map[string]interface{}{ "service": common.MapStr{ "uid": string(service.GetUID()), "name": service.GetName(), - "ip": service.Spec.ClusterIP, + "ip": service.Spec.ClusterIP, }, "namespace_annotations": common.MapStr{ "nsa": "nsb", @@ -81,7 +81,7 @@ func TestGenerateServiceData(t *testing.T) { "service": common.MapStr{ "uid": string(service.GetUID()), "name": service.GetName(), - "ip": "1.2.3.4", + "ip": "1.2.3.4", }, "labels": common.MapStr{ "foo": "bar", @@ -140,7 +140,7 @@ func (s *svcMeta) GenerateK8s(obj kubernetes.Resource, opts ...metadata.FieldOpt "service": common.MapStr{ "uid": string(k8sNode.GetUID()), "name": k8sNode.GetName(), - "ip": "1.2.3.4", + "ip": "1.2.3.4", }, "labels": common.MapStr{ "foo": "bar", From e702b315953c770dd1c150d2f3fea3309231d5c2 Mon Sep 17 00:00:00 2001 From: chrismark Date: Mon, 20 Sep 2021 11:55:53 +0300 Subject: [PATCH 16/16] Remove uneeded check Signed-off-by: chrismark --- .../composable/providers/kubernetes/config.go | 1 - .../composable/providers/kubernetes/pod.go | 43 ++----------------- 2 files changed, 3 insertions(+), 41 deletions(-) diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go index 3f3d84297ece..3d34db03a8b8 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go @@ -11,7 +11,6 @@ import ( "time" "github.com/elastic/beats/v7/libbeat/common/kubernetes/metadata" - "github.com/elastic/beats/v7/libbeat/logp" ) diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go index bd0364f5797a..859f7f29dfde 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go @@ -195,13 +195,9 @@ func (p *pod) OnDelete(obj interface{}) { p.logger.Debugf("pod delete: %+v", obj) pod := obj.(*kubernetes.Pod) - // TODO: verify if we need this check here - if podTerminated(pod, getContainersInPod(pod)) { - time.AfterFunc(p.cleanupTimeout, func() { - p.emitStopped(pod) - }) - return - } + time.AfterFunc(p.cleanupTimeout, func() { + p.emitStopped(pod) + }) } func generatePodData( @@ -411,39 +407,6 @@ func (*namespacePodUpdater) OnAdd(interface{}) {} // namespace they will generate their own delete events. func (*namespacePodUpdater) OnDelete(interface{}) {} -// podTerminating returns true if a pod is marked for deletion or is in a phase beyond running. -func podTerminating(pod *kubernetes.Pod) bool { - if pod.GetObjectMeta().GetDeletionTimestamp() != nil { - return true - } - - switch pod.Status.Phase { - case kubernetes.PodRunning, kubernetes.PodPending: - default: - return true - } - - return false -} - -// podTerminated returns true if a pod is terminated, this method considers a -// pod as terminated if none of its containers are running (or going to be running). -func podTerminated(pod *kubernetes.Pod, containers []*containerInPod) bool { - // Pod is not marked for termination, so it is not terminated. - if !podTerminating(pod) { - return false - } - - // If any container is running, the pod is not terminated yet. - for _, container := range containers { - if container.status.State.Running != nil { - return false - } - } - - return true -} - // getContainersInPod returns all the containers defined in a pod and their statuses. // It includes init and ephemeral containers. func getContainersInPod(pod *kubernetes.Pod) []*containerInPod {