From e3a925cf54890a6b8de8bd84e5de162d4a56ea7d Mon Sep 17 00:00:00 2001 From: Vijay Samuel Date: Sun, 18 Feb 2018 23:37:15 -0800 Subject: [PATCH 1/5] Add builder support for autodiscover and annotations builder --- .../{beater => autodiscover}/autodiscover.go | 2 +- .../builder/log_annotations/config.go | 24 +++++ .../log_annotations/log_annotations.go | 99 +++++++++++++++++++ filebeat/autodiscover/include.go | 5 + filebeat/beater/filebeat.go | 3 +- libbeat/autodiscover/autodiscover.go | 2 +- libbeat/autodiscover/autodiscover_test.go | 10 +- libbeat/autodiscover/builder.go | 78 +++++++++++++++ libbeat/autodiscover/builder/helper.go | 81 +++++++++++++++ libbeat/autodiscover/config.go | 12 ++- libbeat/autodiscover/provider.go | 39 ++++---- .../autodiscover/providers/docker/config.go | 6 +- .../autodiscover/providers/docker/docker.go | 52 +++++++--- .../providers/kubernetes/config.go | 4 - .../providers/kubernetes/kubernetes.go | 83 +++++++++------- libbeat/autodiscover/registry.go | 24 +++++ libbeat/autodiscover/template/config.go | 72 ++++++++------ libbeat/common/kubernetes/types.go | 1 + libbeat/common/kubernetes/watcher.go | 1 - .../{beater => autodiscover}/autodiscover.go | 2 +- metricbeat/beater/metricbeat.go | 3 +- 21 files changed, 483 insertions(+), 120 deletions(-) rename filebeat/{beater => autodiscover}/autodiscover.go (98%) create mode 100644 filebeat/autodiscover/builder/log_annotations/config.go create mode 100644 filebeat/autodiscover/builder/log_annotations/log_annotations.go create mode 100644 filebeat/autodiscover/include.go create mode 100644 libbeat/autodiscover/builder.go create mode 100644 libbeat/autodiscover/builder/helper.go create mode 100644 libbeat/autodiscover/registry.go rename metricbeat/{beater => autodiscover}/autodiscover.go (98%) diff --git a/filebeat/beater/autodiscover.go b/filebeat/autodiscover/autodiscover.go similarity index 98% rename from filebeat/beater/autodiscover.go rename to filebeat/autodiscover/autodiscover.go index 5957e7f8ca01..08e8c1b4d350 100644 --- a/filebeat/beater/autodiscover.go +++ b/filebeat/autodiscover/autodiscover.go @@ -1,4 +1,4 @@ -package beater +package autodiscover import ( "errors" diff --git a/filebeat/autodiscover/builder/log_annotations/config.go b/filebeat/autodiscover/builder/log_annotations/config.go new file mode 100644 index 000000000000..95dc0b81d465 --- /dev/null +++ b/filebeat/autodiscover/builder/log_annotations/config.go @@ -0,0 +1,24 @@ +package log_annotations + +import "github.com/elastic/beats/libbeat/common" + +type config struct { + Prefix string `config:"prefix"` + Config []*common.Config `config:"config"` +} + +func defaultConfig() config { + rawCfg := map[string]interface{}{ + "type": "docker", + "containers": map[string]interface{}{ + "ids": []string{ + "${data.container.id}", + }, + }, + } + cfg, _ := common.NewConfigFrom(rawCfg) + return config{ + Prefix: "co.elastic.logs", + Config: []*common.Config{cfg}, + } +} diff --git a/filebeat/autodiscover/builder/log_annotations/log_annotations.go b/filebeat/autodiscover/builder/log_annotations/log_annotations.go new file mode 100644 index 000000000000..c458f10d9ade --- /dev/null +++ b/filebeat/autodiscover/builder/log_annotations/log_annotations.go @@ -0,0 +1,99 @@ +package log_annotations + +import ( + "fmt" + + "github.com/elastic/beats/libbeat/autodiscover" + "github.com/elastic/beats/libbeat/autodiscover/builder" + "github.com/elastic/beats/libbeat/autodiscover/template" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/bus" + "github.com/elastic/beats/libbeat/logp" +) + +func init() { + autodiscover.Registry.AddBuilder("log.annotations", NewLogAnnotations) +} + +type logAnnotations struct { + Prefix string + Config []*common.Config +} + +func NewLogAnnotations(cfg *common.Config) (autodiscover.Builder, error) { + config := defaultConfig() + err := cfg.Unpack(&config) + + if err != nil { + return nil, fmt.Errorf("unable to unpack log.annotations config due to error: %v", err) + } + + return &logAnnotations{config.Prefix, config.Config}, nil +} + +func (l *logAnnotations) CreateConfig(event bus.Event) []*common.Config { + var config []*common.Config + + host, _ := event["host"].(string) + if host == "" { + return config + } + + annotations, ok := event["annotations"].(map[string]string) + if !ok { + return config + } + + container, ok := event["container"].(common.MapStr) + if !ok { + return config + } + id := builder.GetContainerID(container) + + if builder.IsContainerNoOp(annotations, l.Prefix, id) == true { + return config + } + + //TODO: Add module support + + tempCfg := common.MapStr{} + multiline := l.getMultiline(annotations, id) + + for k, v := range multiline { + tempCfg.Put(k, v) + } + if includeLines := l.getIncludeLines(annotations, id); len(includeLines) != 0 { + tempCfg.Put("include_lines", includeLines) + } + if excludeLines := l.getExcludeLines(annotations, id); len(excludeLines) != 0 { + tempCfg.Put("exclude_lines", excludeLines) + } + + // Merge config template with the configs from the annotations + for _, c := range l.Config { + if err := c.Merge(tempCfg); err != nil { + logp.Debug("log.annotations", "config merge failed with error: %v", err) + } else { + cfg := common.MapStr{} + c.Unpack(cfg) + logp.Debug("log.annotations", "generated config %v", cfg.String()) + config = append(config, c) + } + } + + // Apply information in event to the template to generate the final config + config = template.ApplyConfigTemplate(event, config) + return config +} + +func (l *logAnnotations) getMultiline(annotations map[string]string, container string) map[string]string { + return builder.GetContainerAnnotationsWithPrefix(annotations, l.Prefix, container, "multiline") +} + +func (l *logAnnotations) getIncludeLines(annotations map[string]string, container string) []string { + return builder.GetContainerAnnotationsAsList(annotations, l.Prefix, container, "include_lines") +} + +func (l *logAnnotations) getExcludeLines(annotations map[string]string, container string) []string { + return builder.GetContainerAnnotationsAsList(annotations, l.Prefix, container, "exclude_lines") +} diff --git a/filebeat/autodiscover/include.go b/filebeat/autodiscover/include.go new file mode 100644 index 000000000000..21970b3ee825 --- /dev/null +++ b/filebeat/autodiscover/include.go @@ -0,0 +1,5 @@ +package autodiscover + +import ( + _ "github.com/elastic/beats/filebeat/autodiscover/builder/log_annotations" +) diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 3b35fa1ca413..fc6ff35a5771 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -16,6 +16,7 @@ import ( "github.com/elastic/beats/libbeat/monitoring" "github.com/elastic/beats/libbeat/outputs/elasticsearch" + fbautodiscover "github.com/elastic/beats/filebeat/autodiscover" "github.com/elastic/beats/filebeat/channel" cfg "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/crawler" @@ -295,7 +296,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { var adiscover *autodiscover.Autodiscover if fb.config.Autodiscover != nil { - adapter := NewAutodiscoverAdapter(crawler.InputsFactory, crawler.ModulesFactory) + adapter := fbautodiscover.NewAutodiscoverAdapter(crawler.InputsFactory, crawler.ModulesFactory) adiscover, err = autodiscover.NewAutodiscover("filebeat", adapter, config.Autodiscover) if err != nil { return err diff --git a/libbeat/autodiscover/autodiscover.go b/libbeat/autodiscover/autodiscover.go index 37945734c274..a753350f917e 100644 --- a/libbeat/autodiscover/autodiscover.go +++ b/libbeat/autodiscover/autodiscover.go @@ -51,7 +51,7 @@ func NewAutodiscover(name string, adapter Adapter, config *Config) (*Autodiscove // Init providers var providers []Provider for _, providerCfg := range config.Providers { - provider, err := ProviderRegistry.BuildProvider(bus, providerCfg) + provider, err := Registry.BuildProvider(bus, providerCfg) if err != nil { return nil, err } diff --git a/libbeat/autodiscover/autodiscover_test.go b/libbeat/autodiscover/autodiscover_test.go index a8a5fa11a7ff..f9dfb88c2645 100644 --- a/libbeat/autodiscover/autodiscover_test.go +++ b/libbeat/autodiscover/autodiscover_test.go @@ -10,6 +10,8 @@ import ( "github.com/elastic/beats/libbeat/common/bus" "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/autodiscover/template" ) type mockRunner struct { @@ -102,8 +104,8 @@ func TestNilAutodiscover(t *testing.T) { func TestAutodiscover(t *testing.T) { // Register mock autodiscover provider busChan := make(chan bus.Bus, 1) - ProviderRegistry = NewRegistry() - ProviderRegistry.AddProvider("mock", func(b bus.Bus, c *common.Config) (Provider, error) { + Registry = NewRegistry() + Registry.AddProvider("mock", func(b bus.Bus, mapper *template.Mapper, builders Builders, c *common.Config) (Provider, error) { // intercept bus to mock events busChan <- b @@ -205,8 +207,8 @@ func TestAutodiscoverHash(t *testing.T) { // Register mock autodiscover provider busChan := make(chan bus.Bus, 1) - ProviderRegistry = NewRegistry() - ProviderRegistry.AddProvider("mock", func(b bus.Bus, c *common.Config) (Provider, error) { + Registry = NewRegistry() + Registry.AddProvider("mock", func(b bus.Bus, mapper *template.Mapper, builders Builders, c *common.Config) (Provider, error) { // intercept bus to mock events busChan <- b diff --git a/libbeat/autodiscover/builder.go b/libbeat/autodiscover/builder.go new file mode 100644 index 000000000000..3113dc337399 --- /dev/null +++ b/libbeat/autodiscover/builder.go @@ -0,0 +1,78 @@ +package autodiscover + +import ( + "fmt" + "strings" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/bus" + "github.com/elastic/beats/libbeat/logp" +) + +type Builder interface { + CreateConfig(event bus.Event) []*common.Config +} + +type Builders []Builder + +type BuilderConstructor func(*common.Config) (Builder, error) + +// AddBuilder registers a new BuilderConstructor +func (r *registry) AddBuilder(name string, builder BuilderConstructor) error { + r.lock.Lock() + defer r.lock.Unlock() + + if name == "" { + return fmt.Errorf("builder name is required") + } + + _, exists := r.builders[name] + if exists { + return fmt.Errorf("builder '%s' is already registered", name) + } + + if builder == nil { + return fmt.Errorf("builder '%s' cannot be registered with a nil factory", name) + } + + r.builders[name] = builder + logp.Debug(debugK, "Builder registered: %s", name) + return nil +} + +// GetBuilder returns the provider with the giving name, nil if it doesn't exist +func (r *registry) GetBuilder(name string) BuilderConstructor { + r.lock.RLock() + defer r.lock.RUnlock() + + name = strings.ToLower(name) + return r.builders[name] +} + +// ConstructBuilder reads provider configuration and instatiate one +func (r *registry) ConstructBuilder(c *common.Config) (Builder, error) { + var config BuilderConfig + err := c.Unpack(&config) + if err != nil { + return nil, err + } + + builder := r.GetBuilder(config.Type) + if builder == nil { + return nil, fmt.Errorf("Unknown autodiscover builder %s", config.Type) + } + + return builder(c) +} + +func (b Builders) GetConfig(event bus.Event) []*common.Config { + var configs []*common.Config + + for _, builder := range b { + if config := builder.CreateConfig(event); config != nil { + configs = append(configs, config...) + } + } + + return configs +} diff --git a/libbeat/autodiscover/builder/helper.go b/libbeat/autodiscover/builder/helper.go new file mode 100644 index 000000000000..7593fa6d1590 --- /dev/null +++ b/libbeat/autodiscover/builder/helper.go @@ -0,0 +1,81 @@ +package builder + +import ( + "fmt" + "strconv" + "strings" + + "github.com/elastic/beats/libbeat/common" +) + +func GetContainerID(container common.MapStr) string { + id, _ := container["id"].(string) + return id +} + +func GetAnnotationAsString(annotations map[string]string, prefix, key string) string { + value, _ := annotations[fmt.Sprintf("%s/%s", prefix, key)] + return value +} + +func GetContainerAnnotationAsString(annotations map[string]string, prefix, container, key string) string { + if value := GetAnnotationAsString(annotations, fmt.Sprintf("%s.%s", prefix, container), key); value != "" { + return value + } + return GetAnnotationAsString(annotations, prefix, key) +} + +func GetAnnotationsAsList(annotations map[string]string, prefix, key string) []string { + value := GetAnnotationAsString(annotations, prefix, key) + if value == "" { + return []string{} + } + list := strings.Split(value, ",") + + for i := 0; i < len(list); i++ { + list[i] = strings.TrimSpace(list[i]) + } + + return list +} + +func GetContainerAnnotationsAsList(annotations map[string]string, prefix, container, key string) []string { + if values := GetAnnotationsAsList(annotations, fmt.Sprintf("%s.%s", prefix, container), key); len(values) != 0 { + return values + } + return GetAnnotationsAsList(annotations, prefix, key) +} + +func IsNoOp(annotations map[string]string, prefix string) bool { + value := GetAnnotationAsString(annotations, prefix, "disable") + noop, _ := strconv.ParseBool(value) + + return noop +} + +func IsContainerNoOp(annotations map[string]string, prefix, container string) bool { + if IsNoOp(annotations, prefix) == true { + return true + } + return IsNoOp(annotations, fmt.Sprintf("%s.%s", prefix, container)) +} + +func GetAnnotationsWithPrefix(annotations map[string]string, prefix, key string) map[string]string { + result := map[string]string{} + + pref := fmt.Sprintf("%s/%s.", prefix, key) + for k, v := range annotations { + if strings.Index(k, pref) == 0 { + parts := strings.Split(k, "/") + if len(parts) == 2 { + result[parts[1]] = v + } + } + } + return result +} + +func GetContainerAnnotationsWithPrefix(annotations map[string]string, prefix, container, key string) map[string]string { + pref := fmt.Sprintf("%s.%s", prefix, container) + return GetAnnotationsWithPrefix(annotations, pref, key) +} diff --git a/libbeat/autodiscover/config.go b/libbeat/autodiscover/config.go index f3bdc1f4c719..a4c5d1469e8c 100644 --- a/libbeat/autodiscover/config.go +++ b/libbeat/autodiscover/config.go @@ -1,6 +1,9 @@ package autodiscover -import "github.com/elastic/beats/libbeat/common" +import ( + "github.com/elastic/beats/libbeat/autodiscover/template" + "github.com/elastic/beats/libbeat/common" +) // Config settings for Autodiscover type Config struct { @@ -9,5 +12,12 @@ type Config struct { // ProviderConfig settings type ProviderConfig struct { + Type string `config:"type"` + Builders []*common.Config `config:"builders"` + Templates template.MapperSettings `config:"templates"` +} + +// BuilderConfig settings +type BuilderConfig struct { Type string `config:"type"` } diff --git a/libbeat/autodiscover/provider.go b/libbeat/autodiscover/provider.go index ac1619e13128..e21d1e1a7b89 100644 --- a/libbeat/autodiscover/provider.go +++ b/libbeat/autodiscover/provider.go @@ -3,8 +3,8 @@ package autodiscover import ( "fmt" "strings" - "sync" + "github.com/elastic/beats/libbeat/autodiscover/template" "github.com/elastic/beats/libbeat/cfgfile" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/bus" @@ -16,26 +16,8 @@ type Provider interface { cfgfile.Runner } -// ProviderRegistry holds all known autodiscover providers, they must be added to it to enable them for use -var ProviderRegistry = NewRegistry() - // ProviderBuilder creates a new provider based on the given config and returns it -type ProviderBuilder func(bus.Bus, *common.Config) (Provider, error) - -// Register of autodiscover providers -type registry struct { - // Lock to control concurrent read/writes - lock sync.RWMutex - // A map of provider name to ProviderBuilder. - providers map[string]ProviderBuilder -} - -// NewRegistry creates and returns a new Registry -func NewRegistry() *registry { - return ®istry{ - providers: make(map[string]ProviderBuilder, 0), - } -} +type ProviderBuilder func(bus.Bus, *template.Mapper, Builders, *common.Config) (Provider, error) // AddProvider registers a new ProviderBuilder func (r *registry) AddProvider(name string, provider ProviderBuilder) error { @@ -82,5 +64,20 @@ func (r *registry) BuildProvider(bus bus.Bus, c *common.Config) (Provider, error return nil, fmt.Errorf("Unknown autodiscover provider %s", config.Type) } - return builder(bus, c) + mapper, err := template.NewConfigMapper(config.Templates) + if err != nil { + return nil, err + } + + builders := Builders{} + for _, bCfg := range config.Builders { + builder, err := r.ConstructBuilder(bCfg) + if err != nil { + logp.Debug(debugK, "Could not generate builder due to error: %v", err) + } else { + builders = append(builders, builder) + } + } + + return builder(bus, mapper, builders, c) } diff --git a/libbeat/autodiscover/providers/docker/config.go b/libbeat/autodiscover/providers/docker/config.go index 9de1bee79d67..311cb38477e6 100644 --- a/libbeat/autodiscover/providers/docker/config.go +++ b/libbeat/autodiscover/providers/docker/config.go @@ -1,15 +1,13 @@ package docker import ( - "github.com/elastic/beats/libbeat/autodiscover/template" "github.com/elastic/beats/libbeat/common/docker" ) // Config for docker autodiscover provider type Config struct { - Host string `config:"host"` - TLS *docker.TLSConfig `config:"ssl"` - Templates template.MapperSettings `config:"templates"` + Host string `config:"host"` + TLS *docker.TLSConfig `config:"ssl"` } func defaultConfig() *Config { diff --git a/libbeat/autodiscover/providers/docker/docker.go b/libbeat/autodiscover/providers/docker/docker.go index 1b65318df2fd..9e42ceb6f725 100644 --- a/libbeat/autodiscover/providers/docker/docker.go +++ b/libbeat/autodiscover/providers/docker/docker.go @@ -10,13 +10,14 @@ import ( ) func init() { - autodiscover.ProviderRegistry.AddProvider("docker", AutodiscoverBuilder) + autodiscover.Registry.AddProvider("docker", AutodiscoverBuilder) } // Provider implements autodiscover provider for docker containers type Provider struct { config *Config bus bus.Bus + builders autodiscover.Builders watcher docker.Watcher templates *template.Mapper stop chan interface{} @@ -25,18 +26,13 @@ type Provider struct { } // AutodiscoverBuilder builds and returns an autodiscover provider -func AutodiscoverBuilder(bus bus.Bus, c *common.Config) (autodiscover.Provider, error) { +func AutodiscoverBuilder(bus bus.Bus, mapper *template.Mapper, builders autodiscover.Builders, c *common.Config) (autodiscover.Provider, error) { config := defaultConfig() err := c.Unpack(&config) if err != nil { return nil, err } - mapper, err := template.NewConfigMapper(config.Templates) - if err != nil { - return nil, err - } - watcher, err := docker.NewWatcher(config.Host, config.TLS, false) if err != nil { return nil, err @@ -52,6 +48,7 @@ func AutodiscoverBuilder(bus bus.Bus, c *common.Config) (autodiscover.Provider, return &Provider{ config: config, bus: bus, + builders: builders, templates: mapper, watcher: watcher, stop: make(chan interface{}), @@ -106,17 +103,21 @@ func (d *Provider) emitContainer(event bus.Event, flag string) { }, } - // Emit container info - d.publish(bus.Event{ - flag: true, - "host": host, - "docker": meta, - "meta": common.MapStr{ + // Without this check there would be overlapping configurations with and without ports. + if len(container.Ports) == 0 { + event := bus.Event{ + flag: true, + "host": host, "docker": meta, - }, - }) + "meta": common.MapStr{ + "docker": meta, + }, + } + + d.publish(event) + } - // Emit container private ports + // Emit container container and port information for _, port := range container.Ports { event := bus.Event{ flag: true, @@ -136,6 +137,25 @@ func (d *Provider) publish(event bus.Event) { // Try to match a config if config := d.templates.GetConfig(event); config != nil { event["config"] = config + } else { + // Try to build a config with enabled builders. Send a provider agnostic payload. + // Builders are Beat specific. + e := bus.Event{} + dockerMeta, _ := event["docker"].(common.MapStr) + if host, ok := event["host"]; ok { + e["host"] = host + } + if port, ok := event["port"]; ok { + e["port"] = port + } + if labels, err := dockerMeta.GetValue("docker.labels"); err == nil { + e["annotations"] = labels + } + e["container"] = dockerMeta["container"] + + if config := d.builders.GetConfig(event); config != nil { + event["config"] = config + } } d.bus.Publish(event) } diff --git a/libbeat/autodiscover/providers/kubernetes/config.go b/libbeat/autodiscover/providers/kubernetes/config.go index 7d004b829920..2c75cd9a9e63 100644 --- a/libbeat/autodiscover/providers/kubernetes/config.go +++ b/libbeat/autodiscover/providers/kubernetes/config.go @@ -2,8 +2,6 @@ package kubernetes import ( "time" - - "github.com/elastic/beats/libbeat/autodiscover/template" ) // Config for kubernetes autodiscover provider @@ -18,8 +16,6 @@ type Config struct { IncludeLabels []string `config:"include_labels"` ExcludeLabels []string `config:"exclude_labels"` IncludeAnnotations []string `config:"include_annotations"` - - Templates template.MapperSettings `config:"templates"` } func defaultConfig() *Config { diff --git a/libbeat/autodiscover/providers/kubernetes/kubernetes.go b/libbeat/autodiscover/providers/kubernetes/kubernetes.go index 5fca797a73af..7d226a3f91ec 100644 --- a/libbeat/autodiscover/providers/kubernetes/kubernetes.go +++ b/libbeat/autodiscover/providers/kubernetes/kubernetes.go @@ -12,7 +12,7 @@ import ( ) func init() { - autodiscover.ProviderRegistry.AddProvider("kubernetes", AutodiscoverBuilder) + autodiscover.Registry.AddProvider("kubernetes", AutodiscoverBuilder) } // Provider implements autodiscover provider for docker containers @@ -22,21 +22,17 @@ type Provider struct { watcher kubernetes.Watcher metagen kubernetes.MetaGenerator templates *template.Mapper + builders autodiscover.Builders } // AutodiscoverBuilder builds and returns an autodiscover provider -func AutodiscoverBuilder(bus bus.Bus, c *common.Config) (autodiscover.Provider, error) { +func AutodiscoverBuilder(bus bus.Bus, mapper *template.Mapper, builders autodiscover.Builders, c *common.Config) (autodiscover.Provider, error) { config := defaultConfig() err := c.Unpack(&config) if err != nil { return nil, err } - mapper, err := template.NewConfigMapper(config.Templates) - if err != nil { - return nil, err - } - client, err := kubernetes.GetKubernetesClient(config.InCluster, config.KubeConfig) if err != nil { return nil, err @@ -60,6 +56,7 @@ func AutodiscoverBuilder(bus bus.Bus, c *common.Config) (autodiscover.Provider, config: config, bus: bus, templates: mapper, + builders: builders, metagen: metagen, watcher: watcher, } @@ -88,16 +85,28 @@ func (p *Provider) Start() { } func (p *Provider) emit(pod *kubernetes.Pod, flag string) { + // Emit events for all containers + p.emitEvents(pod, flag, pod.Spec.Containers, pod.Status.ContainerStatuses) + + // Emit events for all initContainers + p.emitEvents(pod, flag, pod.Spec.InitContainers, pod.Status.InitContainerStatuses) +} + +func (p *Provider) emitEvents(pod *kubernetes.Pod, flag string, containers []kubernetes.Container, + containerstatuses []kubernetes.PodContainerStatus) { host := pod.Status.PodIP - containerIDs := map[string]string{} - // Emit pod container IDs - for _, c := range append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...) { + // Collect all container IDs from status information + containerIDs := map[string]string{} + for _, c := range containerstatuses { cid := c.GetContainerID() containerIDs[c.Name] = cid + } + // Emit container and port information + for _, c := range containers { cmeta := common.MapStr{ - "id": cid, + "id": containerIDs[c.Name], "name": c.Name, "image": c.Image, } @@ -109,32 +118,22 @@ func (p *Provider) emit(pod *kubernetes.Pod, flag string) { kubemeta := meta.Clone() kubemeta["container"] = cmeta - // Emit container info - p.publish(bus.Event{ - flag: true, - "host": host, - "kubernetes": kubemeta, - "meta": common.MapStr{ - "kubernetes": meta, - }, - }) - } + // Pass annotations to all events so that it can be used in templating and by annotation builders. + kubemeta["annotations"] = pod.GetMetadata().Annotations - // Emit pod ports - for _, c := range pod.Spec.Containers { - cmeta := common.MapStr{ - "id": containerIDs[c.Name], - "name": c.Name, - "image": c.Image, + // Without this check there would be overlapping configurations with and without ports. + if len(c.Ports) == 0 { + event := bus.Event{ + flag: true, + "host": host, + "kubernetes": kubemeta, + "meta": common.MapStr{ + "kubernetes": meta, + }, + } + p.publish(event) } - // Metadata appended to each event - meta := p.metagen.ContainerMetadata(pod, c.Name) - - // Information that can be used in discovering a workload - kubemeta := meta.Clone() - kubemeta["container"] = cmeta - for _, port := range c.Ports { event := bus.Event{ flag: true, @@ -154,6 +153,22 @@ func (p *Provider) publish(event bus.Event) { // Try to match a config if config := p.templates.GetConfig(event); config != nil { event["config"] = config + } else { + // Try to build a config with enabled builders. Send a provider agnostic payload. + // Builders are Beat specific. + e := bus.Event{} + kubeMeta, _ := event["kubernetes"].(common.MapStr) + if host, ok := event["host"]; ok { + e["host"] = host + } + if port, ok := event["port"]; ok { + e["port"] = port + } + e["annotations"] = kubeMeta["annotations"] + e["container"] = kubeMeta["container"] + if config := p.builders.GetConfig(e); config != nil { + event["config"] = config + } } p.bus.Publish(event) } diff --git a/libbeat/autodiscover/registry.go b/libbeat/autodiscover/registry.go new file mode 100644 index 000000000000..dc68f7f765be --- /dev/null +++ b/libbeat/autodiscover/registry.go @@ -0,0 +1,24 @@ +package autodiscover + +import "sync" + +// Register of autodiscover providers +type registry struct { + // Lock to control concurrent read/writes + lock sync.RWMutex + // A map of provider name to ProviderBuilder. + providers map[string]ProviderBuilder + // A map of builder name to BuilderConstructor. + builders map[string]BuilderConstructor +} + +// Registry holds all known autodiscover providers, they must be added to it to enable them for use +var Registry = NewRegistry() + +// NewRegistry creates and returns a new Registry +func NewRegistry() *registry { + return ®istry{ + providers: make(map[string]ProviderBuilder, 0), + builders: make(map[string]BuilderConstructor, 0), + } +} diff --git a/libbeat/autodiscover/template/config.go b/libbeat/autodiscover/template/config.go index 136467940913..c14bcd6bdc43 100644 --- a/libbeat/autodiscover/template/config.go +++ b/libbeat/autodiscover/template/config.go @@ -6,6 +6,8 @@ import ( "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/processors" + "fmt" + ucfg "github.com/elastic/go-ucfg" ) @@ -63,40 +65,50 @@ func (c *Mapper) GetConfig(event bus.Event) []*common.Config { continue } - // unpack input - vars, err := ucfg.NewFrom(map[string]interface{}{ - "data": event, - }) + configs := ApplyConfigTemplate(event, mapping.Configs) + if configs != nil { + result = append(result, configs...) + } + } + return result +} + +func ApplyConfigTemplate(event bus.Event, configs []*common.Config) []*common.Config { + var result []*common.Config + // unpack input + vars, err := ucfg.NewFrom(map[string]interface{}{ + "data": event, + }) + if err != nil { + logp.Err("Error building config: %v", err) + } + opts := []ucfg.Option{ + ucfg.PathSep("."), + ucfg.Env(vars), + ucfg.ResolveEnv, + ucfg.VarExp, + } + for _, config := range configs { + c, err := ucfg.NewFrom(config, opts...) if err != nil { - logp.Err("Error building config: %v", err) + logp.Err("Error parsing config: %v", err) + continue } - opts := []ucfg.Option{ - ucfg.PathSep("."), - ucfg.Env(vars), - ucfg.ResolveEnv, - ucfg.VarExp, + // Unpack config to process any vars in the template: + var unpacked map[string]interface{} + c.Unpack(&unpacked, opts...) + if err != nil { + logp.Err("Error unpacking config: %v", err) + continue } - for _, config := range mapping.Configs { - c, err := ucfg.NewFrom(config, opts...) - if err != nil { - logp.Err("Error parsing config: %v", err) - continue - } - // Unpack config to process any vars in the template: - var unpacked map[string]interface{} - c.Unpack(&unpacked, opts...) - if err != nil { - logp.Err("Error unpacking config: %v", err) - continue - } - // Repack again: - res, err := common.NewConfigFrom(unpacked) - if err != nil { - logp.Err("Error creating config from unpack: %v", err) - continue - } - result = append(result, res) + fmt.Println(unpacked) + // Repack again: + res, err := common.NewConfigFrom(unpacked) + if err != nil { + logp.Err("Error creating config from unpack: %v", err) + continue } + result = append(result, res) } return result } diff --git a/libbeat/common/kubernetes/types.go b/libbeat/common/kubernetes/types.go index 425e406784ff..2770fc9be7c5 100644 --- a/libbeat/common/kubernetes/types.go +++ b/libbeat/common/kubernetes/types.go @@ -68,6 +68,7 @@ type ContainerPort struct { type PodSpec struct { Containers []Container `json:"containers"` + InitContainers []Container `json:"initContainers"` DNSPolicy string `json:"dnsPolicy"` NodeName string `json:"nodeName"` RestartPolicy string `json:"restartPolicy"` diff --git a/libbeat/common/kubernetes/watcher.go b/libbeat/common/kubernetes/watcher.go index 1f34d653e8e2..022d7f1588d5 100644 --- a/libbeat/common/kubernetes/watcher.go +++ b/libbeat/common/kubernetes/watcher.go @@ -114,7 +114,6 @@ func (w *watcher) sync() error { defer cancel() logp.Info("kubernetes: Performing a resource sync for %T", w.resourceList) - err := w.client.List(ctx, w.options.Namespace, w.resourceList, w.buildOpts()...) if err != nil { logp.Err("kubernetes: Performing a resource sync err %s for %T", err.Error(), w.resourceList) diff --git a/metricbeat/beater/autodiscover.go b/metricbeat/autodiscover/autodiscover.go similarity index 98% rename from metricbeat/beater/autodiscover.go rename to metricbeat/autodiscover/autodiscover.go index 503e9f4e70c4..b9ceffd98904 100644 --- a/metricbeat/beater/autodiscover.go +++ b/metricbeat/autodiscover/autodiscover.go @@ -1,4 +1,4 @@ -package beater +package autodiscover import ( "errors" diff --git a/metricbeat/beater/metricbeat.go b/metricbeat/beater/metricbeat.go index 2188279182b2..2217ae4ac9ae 100644 --- a/metricbeat/beater/metricbeat.go +++ b/metricbeat/beater/metricbeat.go @@ -12,6 +12,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/cfgwarn" "github.com/elastic/beats/libbeat/logp" + mbautodiscover "github.com/elastic/beats/metricbeat/autodiscover" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/module" @@ -146,7 +147,7 @@ func newMetricbeat(b *beat.Beat, c *common.Config, options ...Option) (*Metricbe if config.Autodiscover != nil { var err error factory := module.NewFactory(b.Publisher, metricbeat.moduleOptions...) - adapter := NewAutodiscoverAdapter(factory) + adapter := mbautodiscover.NewAutodiscoverAdapter(factory) metricbeat.autodiscover, err = autodiscover.NewAutodiscover("metricbeat", adapter, config.Autodiscover) if err != nil { return nil, err From b385b2835551a9a92d06cbf09d8fa4b045343b46 Mon Sep 17 00:00:00 2001 From: Vijay Samuel Date: Mon, 19 Feb 2018 15:13:15 -0800 Subject: [PATCH 2/5] Add metric.annotations support --- .../log_annotations/log_annotations.go | 11 +- libbeat/autodiscover/builder/helper.go | 5 + .../builder/metric_annotations/config.go | 11 ++ .../metric_annotations/metric_annotations.go | 141 ++++++++++++++++++ metricbeat/autodiscover/include.go | 5 + 5 files changed, 168 insertions(+), 5 deletions(-) create mode 100644 metricbeat/autodiscover/builder/metric_annotations/config.go create mode 100644 metricbeat/autodiscover/builder/metric_annotations/metric_annotations.go create mode 100644 metricbeat/autodiscover/include.go diff --git a/filebeat/autodiscover/builder/log_annotations/log_annotations.go b/filebeat/autodiscover/builder/log_annotations/log_annotations.go index c458f10d9ade..8a4782ff5f5d 100644 --- a/filebeat/autodiscover/builder/log_annotations/log_annotations.go +++ b/filebeat/autodiscover/builder/log_annotations/log_annotations.go @@ -48,24 +48,25 @@ func (l *logAnnotations) CreateConfig(event bus.Event) []*common.Config { if !ok { return config } - id := builder.GetContainerID(container) - if builder.IsContainerNoOp(annotations, l.Prefix, id) == true { + name := builder.GetContainerName(container) + + if builder.IsContainerNoOp(annotations, l.Prefix, name) == true { return config } //TODO: Add module support tempCfg := common.MapStr{} - multiline := l.getMultiline(annotations, id) + multiline := l.getMultiline(annotations, name) for k, v := range multiline { tempCfg.Put(k, v) } - if includeLines := l.getIncludeLines(annotations, id); len(includeLines) != 0 { + if includeLines := l.getIncludeLines(annotations, name); len(includeLines) != 0 { tempCfg.Put("include_lines", includeLines) } - if excludeLines := l.getExcludeLines(annotations, id); len(excludeLines) != 0 { + if excludeLines := l.getExcludeLines(annotations, name); len(excludeLines) != 0 { tempCfg.Put("exclude_lines", excludeLines) } diff --git a/libbeat/autodiscover/builder/helper.go b/libbeat/autodiscover/builder/helper.go index 7593fa6d1590..4cd5f05a1eec 100644 --- a/libbeat/autodiscover/builder/helper.go +++ b/libbeat/autodiscover/builder/helper.go @@ -13,6 +13,11 @@ func GetContainerID(container common.MapStr) string { return id } +func GetContainerName(container common.MapStr) string { + name, _ := container["name"].(string) + return name +} + func GetAnnotationAsString(annotations map[string]string, prefix, key string) string { value, _ := annotations[fmt.Sprintf("%s/%s", prefix, key)] return value diff --git a/metricbeat/autodiscover/builder/metric_annotations/config.go b/metricbeat/autodiscover/builder/metric_annotations/config.go new file mode 100644 index 000000000000..c3a2db7146fd --- /dev/null +++ b/metricbeat/autodiscover/builder/metric_annotations/config.go @@ -0,0 +1,11 @@ +package metric_annotations + +type config struct { + Prefix string `config:"prefix"` +} + +func defaultConfig() config { + return config{ + Prefix: "co.elastic.metrics", + } +} diff --git a/metricbeat/autodiscover/builder/metric_annotations/metric_annotations.go b/metricbeat/autodiscover/builder/metric_annotations/metric_annotations.go new file mode 100644 index 000000000000..940783aea6b4 --- /dev/null +++ b/metricbeat/autodiscover/builder/metric_annotations/metric_annotations.go @@ -0,0 +1,141 @@ +package metric_annotations + +import ( + "fmt" + + "github.com/elastic/beats/libbeat/autodiscover" + "github.com/elastic/beats/libbeat/autodiscover/builder" + "github.com/elastic/beats/libbeat/autodiscover/template" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/bus" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/metricbeat/mb" +) + +func init() { + autodiscover.Registry.AddBuilder("metric.annotations", NewMetricAnnotations) +} + +const ( + module = "module" + namespace = "namespace" + hosts = "hosts" + metricsets = "metricsets" + period = "period" + timeout = "timeout" + ssl = "ssl" + + default_timeout = "3s" + default_interval = "1m" +) + +type metricAnnotations struct { + Prefix string +} + +func NewMetricAnnotations(cfg *common.Config) (autodiscover.Builder, error) { + config := defaultConfig() + err := cfg.Unpack(&config) + + if err != nil { + return nil, fmt.Errorf("unable to unpack metric.annotations config due to error: %v", err) + } + + return &metricAnnotations{config.Prefix}, nil +} + +func (m *metricAnnotations) CreateConfig(event bus.Event) []*common.Config { + var config []*common.Config + host, _ := event["host"].(string) + if host == "" { + return config + } + + annotations, ok := event["annotations"].(map[string]string) + if !ok { + return config + } + + container, ok := event["container"].(common.MapStr) + if !ok { + return config + } + + name := builder.GetContainerName(container) + + mod := builder.GetContainerAnnotationAsString(annotations, m.Prefix, name, module) + if mod == "" { + return config + } + + hsts := builder.GetContainerAnnotationAsString(annotations, m.Prefix, name, hosts) + ns := builder.GetContainerAnnotationAsString(annotations, m.Prefix, name, namespace) + msets := m.getMetricSets(annotations, name, mod) + tout := m.getTimeout(annotations, name) + ival := m.getPeriod(annotations, name) + + sslConf := builder.GetContainerAnnotationsWithPrefix(annotations, m.Prefix, name, ssl) + + moduleConfig := common.MapStr{ + "module": mod, + "metricsets": msets, + "hosts": hsts, + "timeout": tout, + "period": ival, + "enabled": true, + } + + if ns != "" { + moduleConfig["namespace"] = ns + } + + for k, v := range sslConf { + moduleConfig.Put(k, v) + } + logp.Debug("metric.annotations", "generated config: %v", moduleConfig.String()) + + // Create config object + cfg, err := common.NewConfigFrom(moduleConfig) + if err != nil { + logp.Debug("metric.annotations", "config merge failed with error: %v", err) + } + config = append(config, cfg) + + // Apply information in event to the template to generate the final config + // This especially helps in a scenario where endpoints are configured as: + // co.elastic.metrics/hosts= "${data.host}:9090" + config = template.ApplyConfigTemplate(event, config) + return config +} + +func (m *metricAnnotations) getMetricSets(annotations map[string]string, container, module string) []string { + var msets []string + msets = builder.GetContainerAnnotationsAsList(annotations, m.Prefix, container, metricsets) + + if len(msets) == 0 { + // Special handling for prometheus as most use cases rely on exporters/instrumentation. + // Prometheus stats can be explicitly configured if need be. + if module == "prometheus" { + return []string{"collector"} + } else { + msets = mb.Registry.MetricSets(module) + } + } + return msets +} + +func (m *metricAnnotations) getPeriod(annotations map[string]string, container string) string { + if ival := builder.GetContainerAnnotationAsString(annotations, m.Prefix, container, period); ival != "" { + return ival + } else { + return default_interval + } +} + +func (m *metricAnnotations) getTimeout(annotations map[string]string, container string) string { + if tout := builder.GetContainerAnnotationAsString(annotations, m.Prefix, container, timeout); tout != "" { + return tout + } else { + return default_timeout + } +} diff --git a/metricbeat/autodiscover/include.go b/metricbeat/autodiscover/include.go new file mode 100644 index 000000000000..6d94e7a0bf8f --- /dev/null +++ b/metricbeat/autodiscover/include.go @@ -0,0 +1,5 @@ +package autodiscover + +import ( + _ "github.com/elastic/beats/metricbeat/autodiscover/builder/metric_annotations" +) From bb4b8cda736faa48f5cdaf6e549367c9569b5d9b Mon Sep 17 00:00:00 2001 From: Vijay Samuel Date: Tue, 20 Feb 2018 00:48:28 -0800 Subject: [PATCH 3/5] Add hint support for builders --- .../builder/log_annotations/config.go | 6 +- .../log_annotations/log_annotations.go | 38 +++--- filebeat/autodiscover/include.go | 1 + libbeat/autodiscover/builder.go | 5 + libbeat/autodiscover/builder/helper.go | 108 +++++++++++------- .../autodiscover/providers/docker/config.go | 15 ++- .../autodiscover/providers/docker/docker.go | 9 +- .../providers/kubernetes/config.go | 10 ++ .../providers/kubernetes/kubernetes.go | 42 +++++-- libbeat/common/kubernetes/types.go | 12 +- .../builder/metric_annotations/config.go | 4 +- .../metric_annotations/metric_annotations.go | 75 ++++++++---- metricbeat/autodiscover/include.go | 1 + 13 files changed, 216 insertions(+), 110 deletions(-) diff --git a/filebeat/autodiscover/builder/log_annotations/config.go b/filebeat/autodiscover/builder/log_annotations/config.go index 95dc0b81d465..300c976f904c 100644 --- a/filebeat/autodiscover/builder/log_annotations/config.go +++ b/filebeat/autodiscover/builder/log_annotations/config.go @@ -3,7 +3,7 @@ package log_annotations import "github.com/elastic/beats/libbeat/common" type config struct { - Prefix string `config:"prefix"` + Key string `config:"key"` Config []*common.Config `config:"config"` } @@ -12,13 +12,13 @@ func defaultConfig() config { "type": "docker", "containers": map[string]interface{}{ "ids": []string{ - "${data.container.id}", + "${data.docker.container.id}", }, }, } cfg, _ := common.NewConfigFrom(rawCfg) return config{ - Prefix: "co.elastic.logs", + Key: "logs", Config: []*common.Config{cfg}, } } diff --git a/filebeat/autodiscover/builder/log_annotations/log_annotations.go b/filebeat/autodiscover/builder/log_annotations/log_annotations.go index 8a4782ff5f5d..2a87e5087398 100644 --- a/filebeat/autodiscover/builder/log_annotations/log_annotations.go +++ b/filebeat/autodiscover/builder/log_annotations/log_annotations.go @@ -16,10 +16,11 @@ func init() { } type logAnnotations struct { - Prefix string + Key string Config []*common.Config } +// Construct a log annotations builder func NewLogAnnotations(cfg *common.Config) (autodiscover.Builder, error) { config := defaultConfig() err := cfg.Unpack(&config) @@ -28,9 +29,10 @@ func NewLogAnnotations(cfg *common.Config) (autodiscover.Builder, error) { return nil, fmt.Errorf("unable to unpack log.annotations config due to error: %v", err) } - return &logAnnotations{config.Prefix, config.Config}, nil + return &logAnnotations{config.Key, config.Config}, nil } +// Create config based on input hints in the bus event func (l *logAnnotations) CreateConfig(event bus.Event) []*common.Config { var config []*common.Config @@ -39,34 +41,30 @@ func (l *logAnnotations) CreateConfig(event bus.Event) []*common.Config { return config } - annotations, ok := event["annotations"].(map[string]string) + var hints common.MapStr + hIface, ok := event["hints"] if !ok { return config + } else { + hints, _ = hIface.(common.MapStr) } - container, ok := event["container"].(common.MapStr) - if !ok { - return config - } - - name := builder.GetContainerName(container) - - if builder.IsContainerNoOp(annotations, l.Prefix, name) == true { + if builder.IsNoOp(hints, l.Key) == true { return config } //TODO: Add module support tempCfg := common.MapStr{} - multiline := l.getMultiline(annotations, name) + multiline := l.getMultiline(hints) for k, v := range multiline { tempCfg.Put(k, v) } - if includeLines := l.getIncludeLines(annotations, name); len(includeLines) != 0 { + if includeLines := l.getIncludeLines(hints); len(includeLines) != 0 { tempCfg.Put("include_lines", includeLines) } - if excludeLines := l.getExcludeLines(annotations, name); len(excludeLines) != 0 { + if excludeLines := l.getExcludeLines(hints); len(excludeLines) != 0 { tempCfg.Put("exclude_lines", excludeLines) } @@ -87,14 +85,14 @@ func (l *logAnnotations) CreateConfig(event bus.Event) []*common.Config { return config } -func (l *logAnnotations) getMultiline(annotations map[string]string, container string) map[string]string { - return builder.GetContainerAnnotationsWithPrefix(annotations, l.Prefix, container, "multiline") +func (l *logAnnotations) getMultiline(hints common.MapStr) common.MapStr { + return builder.GetHintMapStr(hints, l.Key, "multiline") } -func (l *logAnnotations) getIncludeLines(annotations map[string]string, container string) []string { - return builder.GetContainerAnnotationsAsList(annotations, l.Prefix, container, "include_lines") +func (l *logAnnotations) getIncludeLines(hints common.MapStr) []string { + return builder.GetHintAsList(hints, l.Key, "include_lines") } -func (l *logAnnotations) getExcludeLines(annotations map[string]string, container string) []string { - return builder.GetContainerAnnotationsAsList(annotations, l.Prefix, container, "exclude_lines") +func (l *logAnnotations) getExcludeLines(hints common.MapStr) []string { + return builder.GetHintAsList(hints, l.Key, "exclude_lines") } diff --git a/filebeat/autodiscover/include.go b/filebeat/autodiscover/include.go index 21970b3ee825..d610b5c4230c 100644 --- a/filebeat/autodiscover/include.go +++ b/filebeat/autodiscover/include.go @@ -1,5 +1,6 @@ package autodiscover import ( + // include all filebeat specific builders _ "github.com/elastic/beats/filebeat/autodiscover/builder/log_annotations" ) diff --git a/libbeat/autodiscover/builder.go b/libbeat/autodiscover/builder.go index 3113dc337399..5659faec4a9f 100644 --- a/libbeat/autodiscover/builder.go +++ b/libbeat/autodiscover/builder.go @@ -9,12 +9,16 @@ import ( "github.com/elastic/beats/libbeat/logp" ) +// Builder provides an interface by which configs can be built from provider metadata type Builder interface { + // CreateConfig creates a config from hints passed from providers CreateConfig(event bus.Event) []*common.Config } +// Builders is a list of Builder objects type Builders []Builder +// BuilderConstructor is a func used to generate a Builder object type BuilderConstructor func(*common.Config) (Builder, error) // AddBuilder registers a new BuilderConstructor @@ -65,6 +69,7 @@ func (r *registry) ConstructBuilder(c *common.Config) (Builder, error) { return builder(c) } +// GetConfig creates configs for all builders initalized. func (b Builders) GetConfig(event bus.Event) []*common.Config { var configs []*common.Config diff --git a/libbeat/autodiscover/builder/helper.go b/libbeat/autodiscover/builder/helper.go index 4cd5f05a1eec..0a4f919eeb87 100644 --- a/libbeat/autodiscover/builder/helper.go +++ b/libbeat/autodiscover/builder/helper.go @@ -8,34 +8,54 @@ import ( "github.com/elastic/beats/libbeat/common" ) +// GetContainerID returns the id of a container func GetContainerID(container common.MapStr) string { id, _ := container["id"].(string) return id } +// GetContainerName returns the name of a container func GetContainerName(container common.MapStr) string { name, _ := container["name"].(string) return name } -func GetAnnotationAsString(annotations map[string]string, prefix, key string) string { - value, _ := annotations[fmt.Sprintf("%s/%s", prefix, key)] - return value +// GetHintString takes a hint and returns its value as a string +func GetHintString(hints common.MapStr, key, config string) string { + if iface, err := hints.GetValue(fmt.Sprintf("%s.%s", key, config)); err == nil { + if str, ok := iface.(string); ok { + return str + } + } + + return "" } -func GetContainerAnnotationAsString(annotations map[string]string, prefix, container, key string) string { - if value := GetAnnotationAsString(annotations, fmt.Sprintf("%s.%s", prefix, container), key); value != "" { - return value +// GetHintMapStr takes a hint and returns a MapStr +func GetHintMapStr(hints common.MapStr, key, config string) common.MapStr { + if iface, err := hints.GetValue(fmt.Sprintf("%s.%s", key, config)); err == nil { + if mapstr, ok := iface.(common.MapStr); ok { + return mapstr + } } - return GetAnnotationAsString(annotations, prefix, key) + + return nil +} + +// GetHintAsList takes a hint and returns the value as lists. +func GetHintAsList(hints common.MapStr, key, config string) []string { + if str := GetHintString(hints, key, config); str != "" { + return getStringAsList(str) + } + + return nil } -func GetAnnotationsAsList(annotations map[string]string, prefix, key string) []string { - value := GetAnnotationAsString(annotations, prefix, key) - if value == "" { +func getStringAsList(input string) []string { + if input == "" { return []string{} } - list := strings.Split(value, ",") + list := strings.Split(input, ",") for i := 0; i < len(list); i++ { list[i] = strings.TrimSpace(list[i]) @@ -44,43 +64,47 @@ func GetAnnotationsAsList(annotations map[string]string, prefix, key string) []s return list } -func GetContainerAnnotationsAsList(annotations map[string]string, prefix, container, key string) []string { - if values := GetAnnotationsAsList(annotations, fmt.Sprintf("%s.%s", prefix, container), key); len(values) != 0 { - return values +// IsNoOp is a big red button to prevent spinning up Runners in case of issues. +func IsNoOp(hints common.MapStr, key string) bool { + if value, err := hints.GetValue(fmt.Sprintf("%s.disable", key)); err == nil { + noop, _ := strconv.ParseBool(value.(string)) + return noop } - return GetAnnotationsAsList(annotations, prefix, key) -} - -func IsNoOp(annotations map[string]string, prefix string) bool { - value := GetAnnotationAsString(annotations, prefix, "disable") - noop, _ := strconv.ParseBool(value) - - return noop -} -func IsContainerNoOp(annotations map[string]string, prefix, container string) bool { - if IsNoOp(annotations, prefix) == true { - return true - } - return IsNoOp(annotations, fmt.Sprintf("%s.%s", prefix, container)) + return false } -func GetAnnotationsWithPrefix(annotations map[string]string, prefix, key string) map[string]string { - result := map[string]string{} - - pref := fmt.Sprintf("%s/%s.", prefix, key) - for k, v := range annotations { - if strings.Index(k, pref) == 0 { - parts := strings.Split(k, "/") - if len(parts) == 2 { - result[parts[1]] = v +// GenerateHints parses annotations based on a prefix and sets up hints that can be picked up by individual Beats. +func GenerateHints(annotations map[string]string, container, prefix string) common.MapStr { + hints := common.MapStr{} + plen := len(prefix) + + for key, value := range annotations { + // Filter out all annotations which start with the prefix + if strings.Index(key, prefix) == 0 { + subKey := key[plen:] + // Split an annotation by /. Ex co.elastic.metrics/module would split to ["metrics", "module"] + // part[0] would give the type of config and part[1] would give the config entry + parts := strings.Split(subKey, "/") + if len(parts) == 0 || parts[0] == "" { + continue + } + // tc stands for type and container + // Split part[0] to get the builder type and the container if it exists + tc := strings.Split(parts[0], ".") + k := fmt.Sprintf("%s.%s", tc[0], parts[1]) + if len(tc) == 2 && container != "" && tc[1] == container { + // Container specific properties always carry higher preference. + // Overwrite properties even if they exist. + hints.Put(k, value) + } else { + // Only insert the config if it doesn't already exist + if _, err := hints.GetValue(k); err != nil { + hints.Put(k, value) + } } } } - return result -} -func GetContainerAnnotationsWithPrefix(annotations map[string]string, prefix, container, key string) map[string]string { - pref := fmt.Sprintf("%s.%s", prefix, container) - return GetAnnotationsWithPrefix(annotations, pref, key) + return hints } diff --git a/libbeat/autodiscover/providers/docker/config.go b/libbeat/autodiscover/providers/docker/config.go index 311cb38477e6..2ae43667d946 100644 --- a/libbeat/autodiscover/providers/docker/config.go +++ b/libbeat/autodiscover/providers/docker/config.go @@ -6,12 +6,21 @@ import ( // Config for docker autodiscover provider type Config struct { - Host string `config:"host"` - TLS *docker.TLSConfig `config:"ssl"` + Host string `config:"host"` + TLS *docker.TLSConfig `config:"ssl"` + Prefix string `config:"string"` } func defaultConfig() *Config { return &Config{ - Host: "unix:///var/run/docker.sock", + Host: "unix:///var/run/docker.sock", + Prefix: "co.elastic.", + } +} + +func (c *Config) Validate() { + // Make sure that prefix ends with a '.' + if c.Prefix[len(c.Prefix)-1] != '.' { + c.Prefix = c.Prefix + "." } } diff --git a/libbeat/autodiscover/providers/docker/docker.go b/libbeat/autodiscover/providers/docker/docker.go index 9e42ceb6f725..aabdbe14c238 100644 --- a/libbeat/autodiscover/providers/docker/docker.go +++ b/libbeat/autodiscover/providers/docker/docker.go @@ -2,6 +2,7 @@ package docker import ( "github.com/elastic/beats/libbeat/autodiscover" + "github.com/elastic/beats/libbeat/autodiscover/builder" "github.com/elastic/beats/libbeat/autodiscover/template" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/bus" @@ -142,6 +143,8 @@ func (d *Provider) publish(event bus.Event) { // Builders are Beat specific. e := bus.Event{} dockerMeta, _ := event["docker"].(common.MapStr) + e["docker"] = dockerMeta + if host, ok := event["host"]; ok { e["host"] = host } @@ -149,11 +152,11 @@ func (d *Provider) publish(event bus.Event) { e["port"] = port } if labels, err := dockerMeta.GetValue("docker.labels"); err == nil { - e["annotations"] = labels + hints := builder.GenerateHints(labels.(map[string]string), "", d.config.Prefix) + e["hints"] = hints } - e["container"] = dockerMeta["container"] - if config := d.builders.GetConfig(event); config != nil { + if config := d.builders.GetConfig(e); config != nil { event["config"] = config } } diff --git a/libbeat/autodiscover/providers/kubernetes/config.go b/libbeat/autodiscover/providers/kubernetes/config.go index 2c75cd9a9e63..27e771174ca0 100644 --- a/libbeat/autodiscover/providers/kubernetes/config.go +++ b/libbeat/autodiscover/providers/kubernetes/config.go @@ -16,6 +16,8 @@ type Config struct { IncludeLabels []string `config:"include_labels"` ExcludeLabels []string `config:"exclude_labels"` IncludeAnnotations []string `config:"include_annotations"` + + Prefix string `config:"prefix"` } func defaultConfig() *Config { @@ -23,5 +25,13 @@ func defaultConfig() *Config { InCluster: true, SyncPeriod: 1 * time.Second, CleanupTimeout: 60 * time.Second, + Prefix: "co.elastic.", + } +} + +func (c *Config) Validate() { + // Make sure that prefix ends with a '.' + if c.Prefix[len(c.Prefix)-1] != '.' { + c.Prefix = c.Prefix + "." } } diff --git a/libbeat/autodiscover/providers/kubernetes/kubernetes.go b/libbeat/autodiscover/providers/kubernetes/kubernetes.go index 7d226a3f91ec..39077f9c4049 100644 --- a/libbeat/autodiscover/providers/kubernetes/kubernetes.go +++ b/libbeat/autodiscover/providers/kubernetes/kubernetes.go @@ -4,6 +4,7 @@ import ( "time" "github.com/elastic/beats/libbeat/autodiscover" + "github.com/elastic/beats/libbeat/autodiscover/builder" "github.com/elastic/beats/libbeat/autodiscover/template" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/bus" @@ -96,22 +97,23 @@ func (p *Provider) emitEvents(pod *kubernetes.Pod, flag string, containers []kub containerstatuses []kubernetes.PodContainerStatus) { host := pod.Status.PodIP - // Collect all container IDs from status information + // Collect all container IDs and runtimes from status information. Kubernetes has both docker and rkt containerIDs := map[string]string{} + runtimes := map[string]string{} for _, c := range containerstatuses { - cid := c.GetContainerID() + cid, runtime := c.GetContainerIDWithRuntime() containerIDs[c.Name] = cid + runtimes[c.Name] = runtime } // Emit container and port information for _, c := range containers { cmeta := common.MapStr{ - "id": containerIDs[c.Name], - "name": c.Name, - "image": c.Image, + "id": containerIDs[c.Name], + "name": c.Name, + "image": c.Image, + "runtime": runtimes[c.Name], } - - // Metadata appended to each event meta := p.metagen.ContainerMetadata(pod, c.Name) // Information that can be used in discovering a workload @@ -164,11 +166,33 @@ func (p *Provider) publish(event bus.Event) { if port, ok := event["port"]; ok { e["port"] = port } - e["annotations"] = kubeMeta["annotations"] - e["container"] = kubeMeta["container"] + // The builder base config can configure any of the field values of kubernetes if need be. + e["kubernetes"] = kubeMeta + + annotations, _ := kubeMeta["annotations"].(map[string]string) + container, _ := kubeMeta["container"].(common.MapStr) + + // This would end up adding a docker|rkt.container entry into the event. This would make sure + // that there is not an attempt to spin up a docker input for a rkt container and when a + // rkt input exists it would be natively supported. + if runtime, ok := container["runtime"]; ok { + e[runtime.(string)] = common.MapStr{ + "container": container, + } + } + + cname := builder.GetContainerName(container) + hints := builder.GenerateHints(annotations, cname, p.config.Prefix) + if len(hints) != 0 { + e["hints"] = hints + } + + logp.Debug("kubernetes", "Generated builder event %v", event) + if config := p.builders.GetConfig(e); config != nil { event["config"] = config } + } p.bus.Publish(event) } diff --git a/libbeat/common/kubernetes/types.go b/libbeat/common/kubernetes/types.go index 2770fc9be7c5..7929cc076be5 100644 --- a/libbeat/common/kubernetes/types.go +++ b/libbeat/common/kubernetes/types.go @@ -133,14 +133,20 @@ func (p *Pod) GetMetadata() *ObjectMeta { // GetContainerID parses the container ID to get the actual ID string func (s *PodContainerStatus) GetContainerID() string { + cID, _ := s.GetContainerIDWithRuntime() + return cID +} + +// GetContainerID parses the container ID to get the actual ID string +func (s *PodContainerStatus) GetContainerIDWithRuntime() (string, string) { cID := s.ContainerID if cID != "" { - parts := strings.Split(cID, "//") + parts := strings.Split(cID, "://") if len(parts) == 2 { - return parts[1] + return parts[1], parts[0] } } - return "" + return "", "" } // Event is kubernetes event diff --git a/metricbeat/autodiscover/builder/metric_annotations/config.go b/metricbeat/autodiscover/builder/metric_annotations/config.go index c3a2db7146fd..17519fcf0def 100644 --- a/metricbeat/autodiscover/builder/metric_annotations/config.go +++ b/metricbeat/autodiscover/builder/metric_annotations/config.go @@ -1,11 +1,11 @@ package metric_annotations type config struct { - Prefix string `config:"prefix"` + Key string `config:"key"` } func defaultConfig() config { return config{ - Prefix: "co.elastic.metrics", + Key: "metrics", } } diff --git a/metricbeat/autodiscover/builder/metric_annotations/metric_annotations.go b/metricbeat/autodiscover/builder/metric_annotations/metric_annotations.go index 940783aea6b4..a1fa64e8123c 100644 --- a/metricbeat/autodiscover/builder/metric_annotations/metric_annotations.go +++ b/metricbeat/autodiscover/builder/metric_annotations/metric_annotations.go @@ -3,6 +3,8 @@ package metric_annotations import ( "fmt" + "strings" + "github.com/elastic/beats/libbeat/autodiscover" "github.com/elastic/beats/libbeat/autodiscover/builder" "github.com/elastic/beats/libbeat/autodiscover/template" @@ -30,9 +32,10 @@ const ( ) type metricAnnotations struct { - Prefix string + Key string } +// Build a new metrics annotation builder func NewMetricAnnotations(cfg *common.Config) (autodiscover.Builder, error) { config := defaultConfig() err := cfg.Unpack(&config) @@ -41,9 +44,10 @@ func NewMetricAnnotations(cfg *common.Config) (autodiscover.Builder, error) { return nil, fmt.Errorf("unable to unpack metric.annotations config due to error: %v", err) } - return &metricAnnotations{config.Prefix}, nil + return &metricAnnotations{config.Key}, nil } +// Create configs based on hints passed from providers func (m *metricAnnotations) CreateConfig(event bus.Event) []*common.Config { var config []*common.Config host, _ := event["host"].(string) @@ -51,30 +55,24 @@ func (m *metricAnnotations) CreateConfig(event bus.Event) []*common.Config { return config } - annotations, ok := event["annotations"].(map[string]string) - if !ok { - return config - } + port, _ := event["port"].(int64) - container, ok := event["container"].(common.MapStr) + hints, ok := event["hints"].(common.MapStr) if !ok { return config } - name := builder.GetContainerName(container) - - mod := builder.GetContainerAnnotationAsString(annotations, m.Prefix, name, module) + mod := m.getModule(hints) if mod == "" { return config } - hsts := builder.GetContainerAnnotationAsString(annotations, m.Prefix, name, hosts) - ns := builder.GetContainerAnnotationAsString(annotations, m.Prefix, name, namespace) - msets := m.getMetricSets(annotations, name, mod) - tout := m.getTimeout(annotations, name) - ival := m.getPeriod(annotations, name) - - sslConf := builder.GetContainerAnnotationsWithPrefix(annotations, m.Prefix, name, ssl) + hsts := m.getHostsWithPort(hints, port) + ns := m.getNamespace(hints) + msets := m.getMetricSets(hints, mod) + tout := m.getTimeout(hints) + ival := m.getPeriod(hints) + sslConf := m.getSSLConfig(hints) moduleConfig := common.MapStr{ "module": mod, @@ -92,12 +90,12 @@ func (m *metricAnnotations) CreateConfig(event bus.Event) []*common.Config { for k, v := range sslConf { moduleConfig.Put(k, v) } - logp.Debug("metric.annotations", "generated config: %v", moduleConfig.String()) + logp.Debug("metric.hints", "generated config: %v", moduleConfig.String()) // Create config object cfg, err := common.NewConfigFrom(moduleConfig) if err != nil { - logp.Debug("metric.annotations", "config merge failed with error: %v", err) + logp.Debug("metric.hints", "config merge failed with error: %v", err) } config = append(config, cfg) @@ -108,9 +106,13 @@ func (m *metricAnnotations) CreateConfig(event bus.Event) []*common.Config { return config } -func (m *metricAnnotations) getMetricSets(annotations map[string]string, container, module string) []string { +func (m *metricAnnotations) getModule(hints common.MapStr) string { + return builder.GetHintString(hints, m.Key, module) +} + +func (m *metricAnnotations) getMetricSets(hints common.MapStr, module string) []string { var msets []string - msets = builder.GetContainerAnnotationsAsList(annotations, m.Prefix, container, metricsets) + msets = builder.GetHintAsList(hints, m.Key, metricsets) if len(msets) == 0 { // Special handling for prometheus as most use cases rely on exporters/instrumentation. @@ -124,18 +126,41 @@ func (m *metricAnnotations) getMetricSets(annotations map[string]string, contain return msets } -func (m *metricAnnotations) getPeriod(annotations map[string]string, container string) string { - if ival := builder.GetContainerAnnotationAsString(annotations, m.Prefix, container, period); ival != "" { +func (m *metricAnnotations) getHostsWithPort(hints common.MapStr, port int64) []string { + var result []string + thosts := builder.GetHintAsList(hints, m.Key, hosts) + + // Only pick hosts that have ${data.port} or the port on current event. This will make + // sure that incorrect meta mapping doesn't happen + for _, h := range thosts { + if strings.Contains(h, "data.port") || strings.Contains(h, fmt.Sprintf("%d", port)) { + result = append(result, h) + } + } + + return result +} + +func (m *metricAnnotations) getNamespace(hints common.MapStr) string { + return builder.GetHintString(hints, m.Key, namespace) +} + +func (m *metricAnnotations) getPeriod(hints common.MapStr) string { + if ival := builder.GetHintString(hints, m.Key, period); ival != "" { return ival } else { return default_interval } } -func (m *metricAnnotations) getTimeout(annotations map[string]string, container string) string { - if tout := builder.GetContainerAnnotationAsString(annotations, m.Prefix, container, timeout); tout != "" { +func (m *metricAnnotations) getTimeout(hints common.MapStr) string { + if tout := builder.GetHintString(hints, m.Key, timeout); tout != "" { return tout } else { return default_timeout } } + +func (m *metricAnnotations) getSSLConfig(hints common.MapStr) common.MapStr { + return builder.GetHintMapStr(hints, m.Key, ssl) +} diff --git a/metricbeat/autodiscover/include.go b/metricbeat/autodiscover/include.go index 6d94e7a0bf8f..4714c3c55b87 100644 --- a/metricbeat/autodiscover/include.go +++ b/metricbeat/autodiscover/include.go @@ -1,5 +1,6 @@ package autodiscover import ( + // include all filebeat specific builders _ "github.com/elastic/beats/metricbeat/autodiscover/builder/metric_annotations" ) From dd889e95f7a1bc85e709cfdc2c3b3b79e332db33 Mon Sep 17 00:00:00 2001 From: Vijay Samuel Date: Wed, 21 Feb 2018 00:15:29 -0800 Subject: [PATCH 4/5] Adding test cases and incorporate review comments --- CHANGELOG.asciidoc | 1 + .../{log_annotations => logs}/config.go | 2 +- .../log_annotations.go => logs/logs.go} | 43 ++--- .../autodiscover/builder/logs/logs_test.go | 145 ++++++++++++++++ filebeat/autodiscover/include.go | 2 +- libbeat/autodiscover/autodiscover_test.go | 6 +- libbeat/autodiscover/builder.go | 4 +- libbeat/autodiscover/builder/helper_test.go | 52 ++++++ libbeat/autodiscover/builder_test.go | 57 +++++++ libbeat/autodiscover/config.go | 5 +- libbeat/autodiscover/provider.go | 20 +-- .../autodiscover/providers/docker/config.go | 11 +- .../autodiscover/providers/docker/docker.go | 67 +++++--- .../docker/docker_integration_test.go | 2 +- .../providers/docker/docker_test.go | 85 ++++++++++ .../providers/kubernetes/config.go | 8 +- .../providers/kubernetes/kubernetes.go | 97 ++++++----- .../providers/kubernetes/kubernetes_test.go | 128 +++++++++++++++ libbeat/autodiscover/template/config.go | 4 +- libbeat/common/kubernetes/types.go | 12 +- libbeat/common/kubernetes/types_test.go | 37 +++++ .../{metric_annotations => metrics}/config.go | 2 +- .../metrics.go} | 26 +-- .../builder/metrics/metrics_test.go | 155 ++++++++++++++++++ metricbeat/autodiscover/include.go | 2 +- 25 files changed, 826 insertions(+), 147 deletions(-) rename filebeat/autodiscover/builder/{log_annotations => logs}/config.go (95%) rename filebeat/autodiscover/builder/{log_annotations/log_annotations.go => logs/logs.go} (66%) create mode 100644 filebeat/autodiscover/builder/logs/logs_test.go create mode 100644 libbeat/autodiscover/builder/helper_test.go create mode 100644 libbeat/autodiscover/builder_test.go create mode 100644 libbeat/autodiscover/providers/docker/docker_test.go create mode 100644 libbeat/autodiscover/providers/kubernetes/kubernetes_test.go create mode 100644 libbeat/common/kubernetes/types_test.go rename metricbeat/autodiscover/builder/{metric_annotations => metrics}/config.go (81%) rename metricbeat/autodiscover/builder/{metric_annotations/metric_annotations.go => metrics/metrics.go} (88%) create mode 100644 metricbeat/autodiscover/builder/metrics/metrics_test.go diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index fdf7d3f6ef10..f0f1e82a4553 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -177,6 +177,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di - The `add_docker_metadata` and `add_kubernetes_metadata` processors are now GA, instead of Beta. {pull}6105[6105] - The node name can be discovered automatically by machine-id matching when beat deployed outside kubernetes cluster. {pull}6146[6146] - Panics will be written to the logger before exiting. {pull}6199[6199] +- Add builder support for autodiscover and annotations builder {pull}6408[6408] *Auditbeat* diff --git a/filebeat/autodiscover/builder/log_annotations/config.go b/filebeat/autodiscover/builder/logs/config.go similarity index 95% rename from filebeat/autodiscover/builder/log_annotations/config.go rename to filebeat/autodiscover/builder/logs/config.go index 300c976f904c..8b7f44bbf17c 100644 --- a/filebeat/autodiscover/builder/log_annotations/config.go +++ b/filebeat/autodiscover/builder/logs/config.go @@ -1,4 +1,4 @@ -package log_annotations +package logs import "github.com/elastic/beats/libbeat/common" diff --git a/filebeat/autodiscover/builder/log_annotations/log_annotations.go b/filebeat/autodiscover/builder/logs/logs.go similarity index 66% rename from filebeat/autodiscover/builder/log_annotations/log_annotations.go rename to filebeat/autodiscover/builder/logs/logs.go index 2a87e5087398..79528d700306 100644 --- a/filebeat/autodiscover/builder/log_annotations/log_annotations.go +++ b/filebeat/autodiscover/builder/logs/logs.go @@ -1,4 +1,4 @@ -package log_annotations +package logs import ( "fmt" @@ -12,15 +12,21 @@ import ( ) func init() { - autodiscover.Registry.AddBuilder("log.annotations", NewLogAnnotations) + autodiscover.Registry.AddBuilder("logs", NewLogAnnotations) } +const ( + multiline = "multiline" + includeLines = "include_lines" + excludeLines = "exclude_lines" +) + type logAnnotations struct { Key string Config []*common.Config } -// Construct a log annotations builder +// NewLogAnnotations builds a log annotations builder func NewLogAnnotations(cfg *common.Config) (autodiscover.Builder, error) { config := defaultConfig() err := cfg.Unpack(&config) @@ -43,9 +49,7 @@ func (l *logAnnotations) CreateConfig(event bus.Event) []*common.Config { var hints common.MapStr hIface, ok := event["hints"] - if !ok { - return config - } else { + if ok { hints, _ = hIface.(common.MapStr) } @@ -56,26 +60,23 @@ func (l *logAnnotations) CreateConfig(event bus.Event) []*common.Config { //TODO: Add module support tempCfg := common.MapStr{} - multiline := l.getMultiline(hints) - - for k, v := range multiline { - tempCfg.Put(k, v) + mline := l.getMultiline(hints) + if len(mline) != 0 { + tempCfg.Put(multiline, mline) } - if includeLines := l.getIncludeLines(hints); len(includeLines) != 0 { - tempCfg.Put("include_lines", includeLines) + if ilines := l.getIncludeLines(hints); len(ilines) != 0 { + tempCfg.Put(includeLines, ilines) } - if excludeLines := l.getExcludeLines(hints); len(excludeLines) != 0 { - tempCfg.Put("exclude_lines", excludeLines) + if elines := l.getExcludeLines(hints); len(elines) != 0 { + tempCfg.Put(excludeLines, elines) } // Merge config template with the configs from the annotations for _, c := range l.Config { if err := c.Merge(tempCfg); err != nil { - logp.Debug("log.annotations", "config merge failed with error: %v", err) + logp.Debug("logs.builder", "config merge failed with error: %v", err) } else { - cfg := common.MapStr{} - c.Unpack(cfg) - logp.Debug("log.annotations", "generated config %v", cfg.String()) + logp.Debug("logs.builder", "generated config %v", *c) config = append(config, c) } } @@ -86,13 +87,13 @@ func (l *logAnnotations) CreateConfig(event bus.Event) []*common.Config { } func (l *logAnnotations) getMultiline(hints common.MapStr) common.MapStr { - return builder.GetHintMapStr(hints, l.Key, "multiline") + return builder.GetHintMapStr(hints, l.Key, multiline) } func (l *logAnnotations) getIncludeLines(hints common.MapStr) []string { - return builder.GetHintAsList(hints, l.Key, "include_lines") + return builder.GetHintAsList(hints, l.Key, includeLines) } func (l *logAnnotations) getExcludeLines(hints common.MapStr) []string { - return builder.GetHintAsList(hints, l.Key, "exclude_lines") + return builder.GetHintAsList(hints, l.Key, excludeLines) } diff --git a/filebeat/autodiscover/builder/logs/logs_test.go b/filebeat/autodiscover/builder/logs/logs_test.go new file mode 100644 index 000000000000..a8dfc9b9542f --- /dev/null +++ b/filebeat/autodiscover/builder/logs/logs_test.go @@ -0,0 +1,145 @@ +package logs + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/bus" +) + +func TestGenerateHints(t *testing.T) { + tests := []struct { + event bus.Event + len int + result common.MapStr + }{ + // Hints without host should return nothing + { + event: bus.Event{ + "hints": common.MapStr{ + "metrics": common.MapStr{ + "module": "prometheus", + }, + }, + }, + len: 0, + result: common.MapStr{}, + }, + // Empty event hints should return default config + { + event: bus.Event{ + "host": "1.2.3.4", + "kubernetes": common.MapStr{ + "container": common.MapStr{ + "name": "foobar", + "id": "abc", + }, + }, + "docker": common.MapStr{ + "container": common.MapStr{ + "name": "foobar", + "id": "abc", + }, + }, + }, + len: 1, + result: common.MapStr{ + "type": "docker", + "containers": map[string]interface{}{ + "ids": []interface{}{"abc"}, + }, + }, + }, + // Hint with include|exclude_lines must be part of the input config + { + event: bus.Event{ + "host": "1.2.3.4", + "kubernetes": common.MapStr{ + "container": common.MapStr{ + "name": "foobar", + "id": "abc", + }, + }, + "docker": common.MapStr{ + "container": common.MapStr{ + "name": "foobar", + "id": "abc", + }, + }, + "hints": common.MapStr{ + "logs": common.MapStr{ + "include_lines": "^test, ^test1", + "exclude_lines": "^test2, ^test3", + }, + }, + }, + len: 1, + result: common.MapStr{ + "type": "docker", + "containers": map[string]interface{}{ + "ids": []interface{}{"abc"}, + }, + "include_lines": []interface{}{"^test", "^test1"}, + "exclude_lines": []interface{}{"^test2", "^test3"}, + }, + }, + // Hint with multiline config must have a multiline in the input config + { + event: bus.Event{ + "host": "1.2.3.4", + "kubernetes": common.MapStr{ + "container": common.MapStr{ + "name": "foobar", + "id": "abc", + }, + }, + "docker": common.MapStr{ + "container": common.MapStr{ + "name": "foobar", + "id": "abc", + }, + }, + "hints": common.MapStr{ + "logs": common.MapStr{ + "multiline": common.MapStr{ + "pattern": "^test", + "negate": "true", + }, + }, + }, + }, + len: 1, + result: common.MapStr{ + "type": "docker", + "containers": map[string]interface{}{ + "ids": []interface{}{"abc"}, + }, + "multiline": map[string]interface{}{ + "pattern": "^test", + "negate": "true", + }, + }, + }, + } + + for _, test := range tests { + cfg := defaultConfig() + l := logAnnotations{ + Key: cfg.Key, + Config: cfg.Config, + } + cfgs := l.CreateConfig(test.event) + assert.Equal(t, len(cfgs), test.len) + + if test.len != 0 { + config := common.MapStr{} + err := cfgs[0].Unpack(&config) + assert.Nil(t, err) + + assert.Equal(t, config, test.result) + } + + } +} diff --git a/filebeat/autodiscover/include.go b/filebeat/autodiscover/include.go index d610b5c4230c..e1b2eb075e8b 100644 --- a/filebeat/autodiscover/include.go +++ b/filebeat/autodiscover/include.go @@ -2,5 +2,5 @@ package autodiscover import ( // include all filebeat specific builders - _ "github.com/elastic/beats/filebeat/autodiscover/builder/log_annotations" + _ "github.com/elastic/beats/filebeat/autodiscover/builder/logs" ) diff --git a/libbeat/autodiscover/autodiscover_test.go b/libbeat/autodiscover/autodiscover_test.go index f9dfb88c2645..449fedfe0e73 100644 --- a/libbeat/autodiscover/autodiscover_test.go +++ b/libbeat/autodiscover/autodiscover_test.go @@ -10,8 +10,6 @@ import ( "github.com/elastic/beats/libbeat/common/bus" "github.com/stretchr/testify/assert" - - "github.com/elastic/beats/libbeat/autodiscover/template" ) type mockRunner struct { @@ -105,7 +103,7 @@ func TestAutodiscover(t *testing.T) { // Register mock autodiscover provider busChan := make(chan bus.Bus, 1) Registry = NewRegistry() - Registry.AddProvider("mock", func(b bus.Bus, mapper *template.Mapper, builders Builders, c *common.Config) (Provider, error) { + Registry.AddProvider("mock", func(b bus.Bus, c *common.Config) (Provider, error) { // intercept bus to mock events busChan <- b @@ -208,7 +206,7 @@ func TestAutodiscoverHash(t *testing.T) { busChan := make(chan bus.Bus, 1) Registry = NewRegistry() - Registry.AddProvider("mock", func(b bus.Bus, mapper *template.Mapper, builders Builders, c *common.Config) (Provider, error) { + Registry.AddProvider("mock", func(b bus.Bus, c *common.Config) (Provider, error) { // intercept bus to mock events busChan <- b diff --git a/libbeat/autodiscover/builder.go b/libbeat/autodiscover/builder.go index 5659faec4a9f..a15ca48b26ac 100644 --- a/libbeat/autodiscover/builder.go +++ b/libbeat/autodiscover/builder.go @@ -53,8 +53,8 @@ func (r *registry) GetBuilder(name string) BuilderConstructor { return r.builders[name] } -// ConstructBuilder reads provider configuration and instatiate one -func (r *registry) ConstructBuilder(c *common.Config) (Builder, error) { +// BuildBuilder reads provider configuration and instatiate one +func (r *registry) BuildBuilder(c *common.Config) (Builder, error) { var config BuilderConfig err := c.Unpack(&config) if err != nil { diff --git a/libbeat/autodiscover/builder/helper_test.go b/libbeat/autodiscover/builder/helper_test.go new file mode 100644 index 000000000000..1cfe55cfcb42 --- /dev/null +++ b/libbeat/autodiscover/builder/helper_test.go @@ -0,0 +1,52 @@ +package builder + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/common" +) + +func TestGenerateHints(t *testing.T) { + tests := []struct { + annotations map[string]string + result common.MapStr + }{ + // Empty annotations should return empty hints + { + annotations: map[string]string{}, + result: common.MapStr{}, + }, + + // Scenarios being tested: + // logs/multiline.pattern must be a nested common.MapStr under hints.logs + // metrics/module must be found in hints.metrics + // not.to.include must not be part of hints + // period is annotated at both container and pod level. Container level value must be in hints + { + annotations: map[string]string{ + "co.elastic.logs/multiline.pattern": "^test", + "co.elastic.metrics/module": "prometheus", + "co.elastic.metrics/period": "10s", + "co.elastic.metrics.foobar/period": "15s", + "not.to.include": "true", + }, + result: common.MapStr{ + "logs": common.MapStr{ + "multiline": common.MapStr{ + "pattern": "^test", + }, + }, + "metrics": common.MapStr{ + "module": "prometheus", + "period": "15s", + }, + }, + }, + } + + for _, test := range tests { + assert.Equal(t, GenerateHints(test.annotations, "foobar", "co.elastic."), test.result) + } +} diff --git a/libbeat/autodiscover/builder_test.go b/libbeat/autodiscover/builder_test.go new file mode 100644 index 000000000000..23da96a0cdec --- /dev/null +++ b/libbeat/autodiscover/builder_test.go @@ -0,0 +1,57 @@ +package autodiscover + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/bus" +) + +type fakeBuilder struct{} + +func (f *fakeBuilder) CreateConfig(event bus.Event) []*common.Config { + return []*common.Config{common.NewConfig()} +} + +func newFakeBuilder(_ *common.Config) (Builder, error) { + return &fakeBuilder{}, nil +} + +func TestBuilderRegistry(t *testing.T) { + // Add a new builder + reg := NewRegistry() + reg.AddBuilder("fake", newFakeBuilder) + + // Check if that builder is available in registry + b := reg.GetBuilder("fake") + assert.NotNil(t, b) + + // Generate a config with type fake + config := BuilderConfig{ + Type: "fake", + } + + cfg, err := common.NewConfigFrom(&config) + + // Make sure that config building doesn't fail + assert.Nil(t, err) + + builder, err := reg.BuildBuilder(cfg) + assert.Nil(t, err) + assert.NotNil(t, builder) + + // Try to create a config with fake builder and assert length + // of configs returned is one + res := builder.CreateConfig(nil) + assert.Equal(t, len(res), 1) + + builders := Builders{} + builders = append(builders, builder) + + // Try using builders object for the same as above and expect + // the same result + res = builders.GetConfig(nil) + assert.Equal(t, len(res), 1) +} diff --git a/libbeat/autodiscover/config.go b/libbeat/autodiscover/config.go index a4c5d1469e8c..dcc8d04cf217 100644 --- a/libbeat/autodiscover/config.go +++ b/libbeat/autodiscover/config.go @@ -1,7 +1,6 @@ package autodiscover import ( - "github.com/elastic/beats/libbeat/autodiscover/template" "github.com/elastic/beats/libbeat/common" ) @@ -12,9 +11,7 @@ type Config struct { // ProviderConfig settings type ProviderConfig struct { - Type string `config:"type"` - Builders []*common.Config `config:"builders"` - Templates template.MapperSettings `config:"templates"` + Type string `config:"type"` } // BuilderConfig settings diff --git a/libbeat/autodiscover/provider.go b/libbeat/autodiscover/provider.go index e21d1e1a7b89..417bee456ad5 100644 --- a/libbeat/autodiscover/provider.go +++ b/libbeat/autodiscover/provider.go @@ -4,7 +4,6 @@ import ( "fmt" "strings" - "github.com/elastic/beats/libbeat/autodiscover/template" "github.com/elastic/beats/libbeat/cfgfile" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/bus" @@ -17,7 +16,7 @@ type Provider interface { } // ProviderBuilder creates a new provider based on the given config and returns it -type ProviderBuilder func(bus.Bus, *template.Mapper, Builders, *common.Config) (Provider, error) +type ProviderBuilder func(bus.Bus, *common.Config) (Provider, error) // AddProvider registers a new ProviderBuilder func (r *registry) AddProvider(name string, provider ProviderBuilder) error { @@ -64,20 +63,5 @@ func (r *registry) BuildProvider(bus bus.Bus, c *common.Config) (Provider, error return nil, fmt.Errorf("Unknown autodiscover provider %s", config.Type) } - mapper, err := template.NewConfigMapper(config.Templates) - if err != nil { - return nil, err - } - - builders := Builders{} - for _, bCfg := range config.Builders { - builder, err := r.ConstructBuilder(bCfg) - if err != nil { - logp.Debug(debugK, "Could not generate builder due to error: %v", err) - } else { - builders = append(builders, builder) - } - } - - return builder(bus, mapper, builders, c) + return builder(bus, c) } diff --git a/libbeat/autodiscover/providers/docker/config.go b/libbeat/autodiscover/providers/docker/config.go index 2ae43667d946..0646c39a87a4 100644 --- a/libbeat/autodiscover/providers/docker/config.go +++ b/libbeat/autodiscover/providers/docker/config.go @@ -1,14 +1,18 @@ package docker import ( + "github.com/elastic/beats/libbeat/autodiscover/template" + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/docker" ) // Config for docker autodiscover provider type Config struct { - Host string `config:"host"` - TLS *docker.TLSConfig `config:"ssl"` - Prefix string `config:"string"` + Host string `config:"host"` + TLS *docker.TLSConfig `config:"ssl"` + Prefix string `config:"string"` + Builders []*common.Config `config:"builders"` + Templates template.MapperSettings `config:"templates"` } func defaultConfig() *Config { @@ -18,6 +22,7 @@ func defaultConfig() *Config { } } +// Validate ensures correctness of config func (c *Config) Validate() { // Make sure that prefix ends with a '.' if c.Prefix[len(c.Prefix)-1] != '.' { diff --git a/libbeat/autodiscover/providers/docker/docker.go b/libbeat/autodiscover/providers/docker/docker.go index aabdbe14c238..713c4747625d 100644 --- a/libbeat/autodiscover/providers/docker/docker.go +++ b/libbeat/autodiscover/providers/docker/docker.go @@ -27,7 +27,7 @@ type Provider struct { } // AutodiscoverBuilder builds and returns an autodiscover provider -func AutodiscoverBuilder(bus bus.Bus, mapper *template.Mapper, builders autodiscover.Builders, c *common.Config) (autodiscover.Provider, error) { +func AutodiscoverBuilder(bus bus.Bus, c *common.Config) (autodiscover.Provider, error) { config := defaultConfig() err := c.Unpack(&config) if err != nil { @@ -39,6 +39,20 @@ func AutodiscoverBuilder(bus bus.Bus, mapper *template.Mapper, builders autodisc return nil, err } + mapper, err := template.NewConfigMapper(config.Templates) + if err != nil { + return nil, err + } + + var builders autodiscover.Builders + for _, bcfg := range config.Builders { + if builder, err := autodiscover.Registry.BuildBuilder(bcfg); err != nil { + logp.Debug("docker", "failed to construct autodiscover builder due to error: %v", err) + } else { + builders = append(builders, builder) + } + } + start := watcher.ListenStart() stop := watcher.ListenStop() @@ -90,17 +104,12 @@ func (d *Provider) emitContainer(event bus.Event, flag string) { host = container.IPAddresses[0] } - labelMap := common.MapStr{} - for k, v := range container.Labels { - labelMap[k] = v - } - meta := common.MapStr{ "container": common.MapStr{ "id": container.ID, "name": container.Name, "image": container.Image, - "labels": labelMap, + "labels": container.Labels, }, } @@ -139,30 +148,38 @@ func (d *Provider) publish(event bus.Event) { if config := d.templates.GetConfig(event); config != nil { event["config"] = config } else { - // Try to build a config with enabled builders. Send a provider agnostic payload. - // Builders are Beat specific. - e := bus.Event{} - dockerMeta, _ := event["docker"].(common.MapStr) - e["docker"] = dockerMeta - - if host, ok := event["host"]; ok { - e["host"] = host - } - if port, ok := event["port"]; ok { - e["port"] = port - } - if labels, err := dockerMeta.GetValue("docker.labels"); err == nil { - hints := builder.GenerateHints(labels.(map[string]string), "", d.config.Prefix) - e["hints"] = hints - } - - if config := d.builders.GetConfig(e); config != nil { + if config := d.builders.GetConfig(d.generateHints(event)); config != nil { event["config"] = config } } d.bus.Publish(event) } +func (d *Provider) generateHints(event bus.Event) bus.Event { + // Try to build a config with enabled builders. Send a provider agnostic payload. + // Builders are Beat specific. + e := bus.Event{} + var dockerMeta common.MapStr + + if rawDocker, ok := event["docker"]; ok { + dockerMeta = rawDocker.(common.MapStr) + e["docker"] = dockerMeta + } + + if host, ok := event["host"]; ok { + e["host"] = host + } + if port, ok := event["port"]; ok { + e["port"] = port + } + if labels, err := dockerMeta.GetValue("container.labels"); err == nil { + hints := builder.GenerateHints(labels.(map[string]string), "", d.config.Prefix) + e["hints"] = hints + } + + return e +} + // Stop the autodiscover process func (d *Provider) Stop() { close(d.stop) diff --git a/libbeat/autodiscover/providers/docker/docker_integration_test.go b/libbeat/autodiscover/providers/docker/docker_integration_test.go index e23384edb206..6eb56580533d 100644 --- a/libbeat/autodiscover/providers/docker/docker_integration_test.go +++ b/libbeat/autodiscover/providers/docker/docker_integration_test.go @@ -71,7 +71,7 @@ func checkEvent(t *testing.T, listener bus.Listener, start bool) { assert.Nil(t, getValue(e, "start")) } assert.Equal(t, getValue(e, "docker.container.image"), "busybox") - assert.Equal(t, getValue(e, "docker.container.labels"), common.MapStr{"label": "value"}) + assert.Equal(t, getValue(e, "docker.container.labels"), map[string]string{"label": "value"}) assert.NotNil(t, getValue(e, "docker.container.id")) assert.NotNil(t, getValue(e, "docker.container.name")) assert.NotNil(t, getValue(e, "host")) diff --git a/libbeat/autodiscover/providers/docker/docker_test.go b/libbeat/autodiscover/providers/docker/docker_test.go new file mode 100644 index 000000000000..33b0c037476d --- /dev/null +++ b/libbeat/autodiscover/providers/docker/docker_test.go @@ -0,0 +1,85 @@ +package docker + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/bus" +) + +func TestGenerateHints(t *testing.T) { + tests := []struct { + event bus.Event + result bus.Event + }{ + // Empty events should return empty hints + { + event: bus.Event{}, + result: bus.Event{}, + }, + // Docker meta must be present in the hints + { + event: bus.Event{ + "docker": common.MapStr{ + "container": common.MapStr{ + "id": "abc", + "name": "foobar", + }, + }, + }, + result: bus.Event{ + "docker": common.MapStr{ + "container": common.MapStr{ + "id": "abc", + "name": "foobar", + }, + }, + }, + }, + // Docker labels are testing with the following scenarios + // do.not.include must not be part of the hints + // logs/disable should be present in hints.logs.disable=true + { + event: bus.Event{ + "docker": common.MapStr{ + "container": common.MapStr{ + "id": "abc", + "name": "foobar", + "labels": map[string]string{ + "do.not.include": "true", + "co.elastic.logs/disable": "true", + }, + }, + }, + }, + result: bus.Event{ + "docker": common.MapStr{ + "container": common.MapStr{ + "id": "abc", + "name": "foobar", + "labels": map[string]string{ + "do.not.include": "true", + "co.elastic.logs/disable": "true", + }, + }, + }, + "hints": common.MapStr{ + "logs": common.MapStr{ + "disable": "true", + }, + }, + }, + }, + } + + cfg := defaultConfig() + + p := Provider{ + config: cfg, + } + for _, test := range tests { + assert.Equal(t, p.generateHints(test.event), test.result) + } +} diff --git a/libbeat/autodiscover/providers/kubernetes/config.go b/libbeat/autodiscover/providers/kubernetes/config.go index 27e771174ca0..07b621af9ec3 100644 --- a/libbeat/autodiscover/providers/kubernetes/config.go +++ b/libbeat/autodiscover/providers/kubernetes/config.go @@ -2,6 +2,9 @@ package kubernetes import ( "time" + + "github.com/elastic/beats/libbeat/autodiscover/template" + "github.com/elastic/beats/libbeat/common" ) // Config for kubernetes autodiscover provider @@ -17,7 +20,9 @@ type Config struct { ExcludeLabels []string `config:"exclude_labels"` IncludeAnnotations []string `config:"include_annotations"` - Prefix string `config:"prefix"` + Prefix string `config:"prefix"` + Builders []*common.Config `config:"builders"` + Templates template.MapperSettings `config:"templates"` } func defaultConfig() *Config { @@ -29,6 +34,7 @@ func defaultConfig() *Config { } } +// Validate ensures correctness of config func (c *Config) Validate() { // Make sure that prefix ends with a '.' if c.Prefix[len(c.Prefix)-1] != '.' { diff --git a/libbeat/autodiscover/providers/kubernetes/kubernetes.go b/libbeat/autodiscover/providers/kubernetes/kubernetes.go index 39077f9c4049..b3feda5f6c4b 100644 --- a/libbeat/autodiscover/providers/kubernetes/kubernetes.go +++ b/libbeat/autodiscover/providers/kubernetes/kubernetes.go @@ -27,7 +27,7 @@ type Provider struct { } // AutodiscoverBuilder builds and returns an autodiscover provider -func AutodiscoverBuilder(bus bus.Bus, mapper *template.Mapper, builders autodiscover.Builders, c *common.Config) (autodiscover.Provider, error) { +func AutodiscoverBuilder(bus bus.Bus, c *common.Config) (autodiscover.Provider, error) { config := defaultConfig() err := c.Unpack(&config) if err != nil { @@ -53,6 +53,20 @@ func AutodiscoverBuilder(bus bus.Bus, mapper *template.Mapper, builders autodisc return nil, err } + mapper, err := template.NewConfigMapper(config.Templates) + if err != nil { + return nil, err + } + + var builders autodiscover.Builders + for _, bcfg := range config.Builders { + if builder, err := autodiscover.Registry.BuildBuilder(bcfg); err != nil { + logp.Debug("kubernetes", "failed to construct autodiscover builder due to error: %v", err) + } else { + builders = append(builders, builder) + } + } + p := &Provider{ config: config, bus: bus, @@ -97,22 +111,19 @@ func (p *Provider) emitEvents(pod *kubernetes.Pod, flag string, containers []kub containerstatuses []kubernetes.PodContainerStatus) { host := pod.Status.PodIP - // Collect all container IDs and runtimes from status information. Kubernetes has both docker and rkt + // Collect all container IDs containerIDs := map[string]string{} - runtimes := map[string]string{} for _, c := range containerstatuses { - cid, runtime := c.GetContainerIDWithRuntime() + cid := c.GetContainerID() containerIDs[c.Name] = cid - runtimes[c.Name] = runtime } // Emit container and port information for _, c := range containers { cmeta := common.MapStr{ - "id": containerIDs[c.Name], - "name": c.Name, - "image": c.Image, - "runtime": runtimes[c.Name], + "id": containerIDs[c.Name], + "name": c.Name, + "image": c.Image, } meta := p.metagen.ContainerMetadata(pod, c.Name) @@ -156,45 +167,53 @@ func (p *Provider) publish(event bus.Event) { if config := p.templates.GetConfig(event); config != nil { event["config"] = config } else { - // Try to build a config with enabled builders. Send a provider agnostic payload. - // Builders are Beat specific. - e := bus.Event{} - kubeMeta, _ := event["kubernetes"].(common.MapStr) - if host, ok := event["host"]; ok { - e["host"] = host - } - if port, ok := event["port"]; ok { - e["port"] = port + // If there isn't a default template then attempt to use builders + if config := p.builders.GetConfig(p.generateHints(event)); config != nil { + event["config"] = config } - // The builder base config can configure any of the field values of kubernetes if need be. - e["kubernetes"] = kubeMeta + } - annotations, _ := kubeMeta["annotations"].(map[string]string) - container, _ := kubeMeta["container"].(common.MapStr) + p.bus.Publish(event) +} - // This would end up adding a docker|rkt.container entry into the event. This would make sure - // that there is not an attempt to spin up a docker input for a rkt container and when a - // rkt input exists it would be natively supported. - if runtime, ok := container["runtime"]; ok { - e[runtime.(string)] = common.MapStr{ - "container": container, - } +func (p *Provider) generateHints(event bus.Event) bus.Event { + // Try to build a config with enabled builders. Send a provider agnostic payload. + // Builders are Beat specific. + e := bus.Event{} + var annotations map[string]string + var kubeMeta, container common.MapStr + rawMeta, ok := event["kubernetes"] + if ok { + kubeMeta = rawMeta.(common.MapStr) + // The builder base config can configure any of the field values of kubernetes if need be. + e["kubernetes"] = kubeMeta + if rawAnn, ok := kubeMeta["annotations"]; ok { + annotations = rawAnn.(map[string]string) } + } + if host, ok := event["host"]; ok { + e["host"] = host + } + if port, ok := event["port"]; ok { + e["port"] = port + } - cname := builder.GetContainerName(container) - hints := builder.GenerateHints(annotations, cname, p.config.Prefix) - if len(hints) != 0 { - e["hints"] = hints + if rawCont, ok := kubeMeta["container"]; ok { + container = rawCont.(common.MapStr) + e["docker"] = common.MapStr{ + "container": container, } + } - logp.Debug("kubernetes", "Generated builder event %v", event) + cname := builder.GetContainerName(container) + hints := builder.GenerateHints(annotations, cname, p.config.Prefix) + if len(hints) != 0 { + e["hints"] = hints + } - if config := p.builders.GetConfig(e); config != nil { - event["config"] = config - } + logp.Debug("kubernetes", "Generated builder event %v", event) - } - p.bus.Publish(event) + return e } // Stop signals the stop channel to force the watch loop routine to stop. diff --git a/libbeat/autodiscover/providers/kubernetes/kubernetes_test.go b/libbeat/autodiscover/providers/kubernetes/kubernetes_test.go new file mode 100644 index 000000000000..9383c95dc2cc --- /dev/null +++ b/libbeat/autodiscover/providers/kubernetes/kubernetes_test.go @@ -0,0 +1,128 @@ +package kubernetes + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/bus" +) + +func TestGenerateHints(t *testing.T) { + tests := []struct { + event bus.Event + result bus.Event + }{ + // Empty events should return empty hints + { + event: bus.Event{}, + result: bus.Event{}, + }, + // Only kubernetes payload must return only kubernetes as part of the hint + { + event: bus.Event{ + "kubernetes": common.MapStr{ + "pod": common.MapStr{ + "name": "foobar", + }, + }, + }, + result: bus.Event{ + "kubernetes": common.MapStr{ + "pod": common.MapStr{ + "name": "foobar", + }, + }, + }, + }, + // Kubernetes payload with container info must be bubbled to top level + { + event: bus.Event{ + "kubernetes": common.MapStr{ + "container": common.MapStr{ + "name": "foobar", + "id": "abc", + }, + }, + }, + result: bus.Event{ + "kubernetes": common.MapStr{ + "container": common.MapStr{ + "name": "foobar", + "id": "abc", + }, + }, + "docker": common.MapStr{ + "container": common.MapStr{ + "name": "foobar", + "id": "abc", + }, + }, + }, + }, + // Scenarios being tested: + // logs/multiline.pattern must be a nested common.MapStr under hints.logs + // metrics/module must be found in hints.metrics + // not.to.include must not be part of hints + // period is annotated at both container and pod level. Container level value must be in hints + { + event: bus.Event{ + "kubernetes": common.MapStr{ + "annotations": map[string]string{ + "co.elastic.logs/multiline.pattern": "^test", + "co.elastic.metrics/module": "prometheus", + "co.elastic.metrics/period": "10s", + "co.elastic.metrics.foobar/period": "15s", + "not.to.include": "true", + }, + "container": common.MapStr{ + "name": "foobar", + "id": "abc", + }, + }, + }, + result: bus.Event{ + "kubernetes": common.MapStr{ + "annotations": map[string]string{ + "co.elastic.logs/multiline.pattern": "^test", + "co.elastic.metrics/module": "prometheus", + "not.to.include": "true", + "co.elastic.metrics/period": "10s", + "co.elastic.metrics.foobar/period": "15s", + }, + "container": common.MapStr{ + "name": "foobar", + "id": "abc", + }, + }, + "hints": common.MapStr{ + "logs": common.MapStr{ + "multiline": common.MapStr{ + "pattern": "^test", + }, + }, + "metrics": common.MapStr{ + "module": "prometheus", + "period": "15s", + }, + }, + "docker": common.MapStr{ + "container": common.MapStr{ + "name": "foobar", + "id": "abc", + }, + }, + }, + }, + } + + cfg := defaultConfig() + + p := Provider{ + config: cfg, + } + for _, test := range tests { + assert.Equal(t, p.generateHints(test.event), test.result) + } +} diff --git a/libbeat/autodiscover/template/config.go b/libbeat/autodiscover/template/config.go index c14bcd6bdc43..eee8865907b4 100644 --- a/libbeat/autodiscover/template/config.go +++ b/libbeat/autodiscover/template/config.go @@ -6,8 +6,6 @@ import ( "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/processors" - "fmt" - ucfg "github.com/elastic/go-ucfg" ) @@ -73,6 +71,7 @@ func (c *Mapper) GetConfig(event bus.Event) []*common.Config { return result } +// ApplyConfigTemplate takes a set of templated configs and applys information in an event map func ApplyConfigTemplate(event bus.Event, configs []*common.Config) []*common.Config { var result []*common.Config // unpack input @@ -101,7 +100,6 @@ func ApplyConfigTemplate(event bus.Event, configs []*common.Config) []*common.Co logp.Err("Error unpacking config: %v", err) continue } - fmt.Println(unpacked) // Repack again: res, err := common.NewConfigFrom(unpacked) if err != nil { diff --git a/libbeat/common/kubernetes/types.go b/libbeat/common/kubernetes/types.go index 7929cc076be5..2770fc9be7c5 100644 --- a/libbeat/common/kubernetes/types.go +++ b/libbeat/common/kubernetes/types.go @@ -133,20 +133,14 @@ func (p *Pod) GetMetadata() *ObjectMeta { // GetContainerID parses the container ID to get the actual ID string func (s *PodContainerStatus) GetContainerID() string { - cID, _ := s.GetContainerIDWithRuntime() - return cID -} - -// GetContainerID parses the container ID to get the actual ID string -func (s *PodContainerStatus) GetContainerIDWithRuntime() (string, string) { cID := s.ContainerID if cID != "" { - parts := strings.Split(cID, "://") + parts := strings.Split(cID, "//") if len(parts) == 2 { - return parts[1], parts[0] + return parts[1] } } - return "", "" + return "" } // Event is kubernetes event diff --git a/libbeat/common/kubernetes/types_test.go b/libbeat/common/kubernetes/types_test.go new file mode 100644 index 000000000000..a4e9120b231e --- /dev/null +++ b/libbeat/common/kubernetes/types_test.go @@ -0,0 +1,37 @@ +package kubernetes + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestPodContainerStatus_GetContainerID(t *testing.T) { + tests := []struct { + status *PodContainerStatus + result string + }{ + // Check to see if x://y is parsed to return y as the container id + { + status: &PodContainerStatus{ + Name: "foobar", + ContainerID: "docker://abc", + Image: "foobar:latest", + }, + result: "abc", + }, + // Check to see if x://y is not the format then "" is returned + { + status: &PodContainerStatus{ + Name: "foobar", + ContainerID: "abc", + Image: "foobar:latest", + }, + result: "", + }, + } + + for _, test := range tests { + assert.Equal(t, test.status.GetContainerID(), test.result) + } +} diff --git a/metricbeat/autodiscover/builder/metric_annotations/config.go b/metricbeat/autodiscover/builder/metrics/config.go similarity index 81% rename from metricbeat/autodiscover/builder/metric_annotations/config.go rename to metricbeat/autodiscover/builder/metrics/config.go index 17519fcf0def..b1c621f9e8a0 100644 --- a/metricbeat/autodiscover/builder/metric_annotations/config.go +++ b/metricbeat/autodiscover/builder/metrics/config.go @@ -1,4 +1,4 @@ -package metric_annotations +package metrics type config struct { Key string `config:"key"` diff --git a/metricbeat/autodiscover/builder/metric_annotations/metric_annotations.go b/metricbeat/autodiscover/builder/metrics/metrics.go similarity index 88% rename from metricbeat/autodiscover/builder/metric_annotations/metric_annotations.go rename to metricbeat/autodiscover/builder/metrics/metrics.go index a1fa64e8123c..3c13cd0637a6 100644 --- a/metricbeat/autodiscover/builder/metric_annotations/metric_annotations.go +++ b/metricbeat/autodiscover/builder/metrics/metrics.go @@ -1,4 +1,4 @@ -package metric_annotations +package metrics import ( "fmt" @@ -15,7 +15,7 @@ import ( ) func init() { - autodiscover.Registry.AddBuilder("metric.annotations", NewMetricAnnotations) + autodiscover.Registry.AddBuilder("metrics", NewMetricAnnotations) } const ( @@ -27,15 +27,15 @@ const ( timeout = "timeout" ssl = "ssl" - default_timeout = "3s" - default_interval = "1m" + defaultTimeout = "3s" + defaultInterval = "1m" ) type metricAnnotations struct { Key string } -// Build a new metrics annotation builder +// NewMetricAnnotations builds a new metrics annotation builder func NewMetricAnnotations(cfg *common.Config) (autodiscover.Builder, error) { config := defaultConfig() err := cfg.Unpack(&config) @@ -90,13 +90,14 @@ func (m *metricAnnotations) CreateConfig(event bus.Event) []*common.Config { for k, v := range sslConf { moduleConfig.Put(k, v) } - logp.Debug("metric.hints", "generated config: %v", moduleConfig.String()) + logp.Debug("metrics.builder", "generated config: %v", moduleConfig.String()) // Create config object cfg, err := common.NewConfigFrom(moduleConfig) if err != nil { - logp.Debug("metric.hints", "config merge failed with error: %v", err) + logp.Debug("metrics.builder", "config merge failed with error: %v", err) } + logp.Debug("metrics.builder", "generated config: %v", *cfg) config = append(config, cfg) // Apply information in event to the template to generate the final config @@ -118,7 +119,7 @@ func (m *metricAnnotations) getMetricSets(hints common.MapStr, module string) [] // Special handling for prometheus as most use cases rely on exporters/instrumentation. // Prometheus stats can be explicitly configured if need be. if module == "prometheus" { - return []string{"collector"} + msets = []string{"collector"} } else { msets = mb.Registry.MetricSets(module) } @@ -133,7 +134,7 @@ func (m *metricAnnotations) getHostsWithPort(hints common.MapStr, port int64) [] // Only pick hosts that have ${data.port} or the port on current event. This will make // sure that incorrect meta mapping doesn't happen for _, h := range thosts { - if strings.Contains(h, "data.port") || strings.Contains(h, fmt.Sprintf("%d", port)) { + if strings.Contains(h, "data.port") || strings.Contains(h, fmt.Sprintf(":%d", port)) { result = append(result, h) } } @@ -148,17 +149,16 @@ func (m *metricAnnotations) getNamespace(hints common.MapStr) string { func (m *metricAnnotations) getPeriod(hints common.MapStr) string { if ival := builder.GetHintString(hints, m.Key, period); ival != "" { return ival - } else { - return default_interval } + + return defaultInterval } func (m *metricAnnotations) getTimeout(hints common.MapStr) string { if tout := builder.GetHintString(hints, m.Key, timeout); tout != "" { return tout - } else { - return default_timeout } + return defaultTimeout } func (m *metricAnnotations) getSSLConfig(hints common.MapStr) common.MapStr { diff --git a/metricbeat/autodiscover/builder/metrics/metrics_test.go b/metricbeat/autodiscover/builder/metrics/metrics_test.go new file mode 100644 index 000000000000..662fb8ba52fa --- /dev/null +++ b/metricbeat/autodiscover/builder/metrics/metrics_test.go @@ -0,0 +1,155 @@ +package metrics + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/bus" +) + +func TestGenerateHints(t *testing.T) { + tests := []struct { + event bus.Event + len int + result common.MapStr + }{ + // Empty event hints should return empty config + { + event: bus.Event{ + "host": "1.2.3.4", + "kubernetes": common.MapStr{ + "container": common.MapStr{ + "name": "foobar", + "id": "abc", + }, + }, + "docker": common.MapStr{ + "container": common.MapStr{ + "name": "foobar", + "id": "abc", + }, + }, + }, + len: 0, + result: common.MapStr{}, + }, + // Hints without host should return nothing + { + event: bus.Event{ + "hints": common.MapStr{ + "metrics": common.MapStr{ + "module": "prometheus", + }, + }, + }, + len: 0, + result: common.MapStr{}, + }, + // Only module hint should return empty config + { + event: bus.Event{ + "host": "1.2.3.4", + "hints": common.MapStr{ + "metrics": common.MapStr{ + "module": "prometheus", + }, + }, + }, + len: 1, + result: common.MapStr{ + "module": "prometheus", + "metricsets": []interface{}{"collector"}, + "timeout": "3s", + "period": "1m", + "enabled": true, + }, + }, + // Only module, namespace hint should return empty config + { + event: bus.Event{ + "host": "1.2.3.4", + "hints": common.MapStr{ + "metrics": common.MapStr{ + "module": "prometheus", + "namespace": "test", + }, + }, + }, + len: 1, + result: common.MapStr{ + "module": "prometheus", + "namespace": "test", + "metricsets": []interface{}{"collector"}, + "timeout": "3s", + "period": "1m", + "enabled": true, + }, + }, + // Module, namespace, host hint should return valid config without port should not return hosts + { + event: bus.Event{ + "host": "1.2.3.4", + "hints": common.MapStr{ + "metrics": common.MapStr{ + "module": "prometheus", + "namespace": "test", + "hosts": "${data.host}:9090", + }, + }, + }, + len: 1, + result: common.MapStr{ + "module": "prometheus", + "namespace": "test", + "metricsets": []interface{}{"collector"}, + "timeout": "3s", + "period": "1m", + "enabled": true, + }, + }, + // Module, namespace, host hint should return valid config + { + event: bus.Event{ + "host": "1.2.3.4", + "port": int64(9090), + "hints": common.MapStr{ + "metrics": common.MapStr{ + "module": "prometheus", + "namespace": "test", + "hosts": "${data.host}:9090", + }, + }, + }, + len: 1, + result: common.MapStr{ + "module": "prometheus", + "namespace": "test", + "metricsets": []interface{}{"collector"}, + "hosts": []interface{}{"1.2.3.4:9090"}, + "timeout": "3s", + "period": "1m", + "enabled": true, + }, + }, + } + for _, test := range tests { + cfg := defaultConfig() + + m := metricAnnotations{ + Key: cfg.Key, + } + cfgs := m.CreateConfig(test.event) + assert.Equal(t, len(cfgs), test.len) + + if test.len != 0 { + config := common.MapStr{} + err := cfgs[0].Unpack(&config) + assert.Nil(t, err) + + assert.Equal(t, config, test.result) + } + + } +} diff --git a/metricbeat/autodiscover/include.go b/metricbeat/autodiscover/include.go index 4714c3c55b87..871253fe5ae9 100644 --- a/metricbeat/autodiscover/include.go +++ b/metricbeat/autodiscover/include.go @@ -2,5 +2,5 @@ package autodiscover import ( // include all filebeat specific builders - _ "github.com/elastic/beats/metricbeat/autodiscover/builder/metric_annotations" + _ "github.com/elastic/beats/metricbeat/autodiscover/builder/metrics" ) From 58cc1e349114462cd08bd2580bafd240af78f364 Mon Sep 17 00:00:00 2001 From: Vijay Samuel Date: Wed, 21 Feb 2018 08:47:27 -0800 Subject: [PATCH 5/5] Make annotations to use common.MapStr --- libbeat/autodiscover/builder/helper.go | 2 +- libbeat/autodiscover/builder/helper_test.go | 6 +++--- libbeat/autodiscover/providers/docker/docker.go | 9 +++++++-- .../providers/docker/docker_integration_test.go | 2 +- libbeat/autodiscover/providers/docker/docker_test.go | 4 ++-- .../autodiscover/providers/kubernetes/kubernetes.go | 10 +++++++--- .../providers/kubernetes/kubernetes_test.go | 4 ++-- 7 files changed, 23 insertions(+), 14 deletions(-) diff --git a/libbeat/autodiscover/builder/helper.go b/libbeat/autodiscover/builder/helper.go index 0a4f919eeb87..579a7eabe4e7 100644 --- a/libbeat/autodiscover/builder/helper.go +++ b/libbeat/autodiscover/builder/helper.go @@ -75,7 +75,7 @@ func IsNoOp(hints common.MapStr, key string) bool { } // GenerateHints parses annotations based on a prefix and sets up hints that can be picked up by individual Beats. -func GenerateHints(annotations map[string]string, container, prefix string) common.MapStr { +func GenerateHints(annotations common.MapStr, container, prefix string) common.MapStr { hints := common.MapStr{} plen := len(prefix) diff --git a/libbeat/autodiscover/builder/helper_test.go b/libbeat/autodiscover/builder/helper_test.go index 1cfe55cfcb42..8b0852f86a6e 100644 --- a/libbeat/autodiscover/builder/helper_test.go +++ b/libbeat/autodiscover/builder/helper_test.go @@ -10,12 +10,12 @@ import ( func TestGenerateHints(t *testing.T) { tests := []struct { - annotations map[string]string + annotations common.MapStr result common.MapStr }{ // Empty annotations should return empty hints { - annotations: map[string]string{}, + annotations: common.MapStr{}, result: common.MapStr{}, }, @@ -25,7 +25,7 @@ func TestGenerateHints(t *testing.T) { // not.to.include must not be part of hints // period is annotated at both container and pod level. Container level value must be in hints { - annotations: map[string]string{ + annotations: common.MapStr{ "co.elastic.logs/multiline.pattern": "^test", "co.elastic.metrics/module": "prometheus", "co.elastic.metrics/period": "10s", diff --git a/libbeat/autodiscover/providers/docker/docker.go b/libbeat/autodiscover/providers/docker/docker.go index 713c4747625d..1c04f29da08f 100644 --- a/libbeat/autodiscover/providers/docker/docker.go +++ b/libbeat/autodiscover/providers/docker/docker.go @@ -104,12 +104,17 @@ func (d *Provider) emitContainer(event bus.Event, flag string) { host = container.IPAddresses[0] } + labelMap := common.MapStr{} + for k, v := range container.Labels { + labelMap[k] = v + } + meta := common.MapStr{ "container": common.MapStr{ "id": container.ID, "name": container.Name, "image": container.Image, - "labels": container.Labels, + "labels": labelMap, }, } @@ -173,7 +178,7 @@ func (d *Provider) generateHints(event bus.Event) bus.Event { e["port"] = port } if labels, err := dockerMeta.GetValue("container.labels"); err == nil { - hints := builder.GenerateHints(labels.(map[string]string), "", d.config.Prefix) + hints := builder.GenerateHints(labels.(common.MapStr), "", d.config.Prefix) e["hints"] = hints } diff --git a/libbeat/autodiscover/providers/docker/docker_integration_test.go b/libbeat/autodiscover/providers/docker/docker_integration_test.go index 6eb56580533d..e23384edb206 100644 --- a/libbeat/autodiscover/providers/docker/docker_integration_test.go +++ b/libbeat/autodiscover/providers/docker/docker_integration_test.go @@ -71,7 +71,7 @@ func checkEvent(t *testing.T, listener bus.Listener, start bool) { assert.Nil(t, getValue(e, "start")) } assert.Equal(t, getValue(e, "docker.container.image"), "busybox") - assert.Equal(t, getValue(e, "docker.container.labels"), map[string]string{"label": "value"}) + assert.Equal(t, getValue(e, "docker.container.labels"), common.MapStr{"label": "value"}) assert.NotNil(t, getValue(e, "docker.container.id")) assert.NotNil(t, getValue(e, "docker.container.name")) assert.NotNil(t, getValue(e, "host")) diff --git a/libbeat/autodiscover/providers/docker/docker_test.go b/libbeat/autodiscover/providers/docker/docker_test.go index 33b0c037476d..78f69c60ff9b 100644 --- a/libbeat/autodiscover/providers/docker/docker_test.go +++ b/libbeat/autodiscover/providers/docker/docker_test.go @@ -47,7 +47,7 @@ func TestGenerateHints(t *testing.T) { "container": common.MapStr{ "id": "abc", "name": "foobar", - "labels": map[string]string{ + "labels": common.MapStr{ "do.not.include": "true", "co.elastic.logs/disable": "true", }, @@ -59,7 +59,7 @@ func TestGenerateHints(t *testing.T) { "container": common.MapStr{ "id": "abc", "name": "foobar", - "labels": map[string]string{ + "labels": common.MapStr{ "do.not.include": "true", "co.elastic.logs/disable": "true", }, diff --git a/libbeat/autodiscover/providers/kubernetes/kubernetes.go b/libbeat/autodiscover/providers/kubernetes/kubernetes.go index b3feda5f6c4b..53aad1bc5e80 100644 --- a/libbeat/autodiscover/providers/kubernetes/kubernetes.go +++ b/libbeat/autodiscover/providers/kubernetes/kubernetes.go @@ -132,7 +132,11 @@ func (p *Provider) emitEvents(pod *kubernetes.Pod, flag string, containers []kub kubemeta["container"] = cmeta // Pass annotations to all events so that it can be used in templating and by annotation builders. - kubemeta["annotations"] = pod.GetMetadata().Annotations + annotations := common.MapStr{} + for k, v := range pod.GetMetadata().Annotations { + annotations[k] = v + } + kubemeta["annotations"] = annotations // Without this check there would be overlapping configurations with and without ports. if len(c.Ports) == 0 { @@ -180,7 +184,7 @@ func (p *Provider) generateHints(event bus.Event) bus.Event { // Try to build a config with enabled builders. Send a provider agnostic payload. // Builders are Beat specific. e := bus.Event{} - var annotations map[string]string + var annotations common.MapStr var kubeMeta, container common.MapStr rawMeta, ok := event["kubernetes"] if ok { @@ -188,7 +192,7 @@ func (p *Provider) generateHints(event bus.Event) bus.Event { // The builder base config can configure any of the field values of kubernetes if need be. e["kubernetes"] = kubeMeta if rawAnn, ok := kubeMeta["annotations"]; ok { - annotations = rawAnn.(map[string]string) + annotations = rawAnn.(common.MapStr) } } if host, ok := event["host"]; ok { diff --git a/libbeat/autodiscover/providers/kubernetes/kubernetes_test.go b/libbeat/autodiscover/providers/kubernetes/kubernetes_test.go index 9383c95dc2cc..0e06a36f5c02 100644 --- a/libbeat/autodiscover/providers/kubernetes/kubernetes_test.go +++ b/libbeat/autodiscover/providers/kubernetes/kubernetes_test.go @@ -69,7 +69,7 @@ func TestGenerateHints(t *testing.T) { { event: bus.Event{ "kubernetes": common.MapStr{ - "annotations": map[string]string{ + "annotations": common.MapStr{ "co.elastic.logs/multiline.pattern": "^test", "co.elastic.metrics/module": "prometheus", "co.elastic.metrics/period": "10s", @@ -84,7 +84,7 @@ func TestGenerateHints(t *testing.T) { }, result: bus.Event{ "kubernetes": common.MapStr{ - "annotations": map[string]string{ + "annotations": common.MapStr{ "co.elastic.logs/multiline.pattern": "^test", "co.elastic.metrics/module": "prometheus", "not.to.include": "true",