diff --git a/filebeat/processor/add_kubernetes_metadata/matchers.go b/filebeat/processor/add_kubernetes_metadata/matchers.go index 35866e45166..7e7996fede7 100644 --- a/filebeat/processor/add_kubernetes_metadata/matchers.go +++ b/filebeat/processor/add_kubernetes_metadata/matchers.go @@ -125,8 +125,7 @@ func (f *LogPathMatcher) MetadataIndex(event common.MapStr) string { } } - f.logger.Error(`Error extracting pod uid - source value does not contains matcher's logs_path, - supported log_path for 'pod' resource_type: '/var/lib/kubelet/pods/', '/var/log/pods/'.`) + f.logger.Error("Error extracting pod uid - source value does not contains matcher's logs_path") return "" } } diff --git a/libbeat/processors/add_kubernetes_metadata/config.go b/libbeat/processors/add_kubernetes_metadata/config.go index 6e738403087..41bf543b1f0 100644 --- a/libbeat/processors/add_kubernetes_metadata/config.go +++ b/libbeat/processors/add_kubernetes_metadata/config.go @@ -69,5 +69,34 @@ func (k *kubeAnnotatorConfig) Validate() error { k.Host = "" } + for _, matcher := range k.Matchers { + if matcherCfg, ok := matcher["logs_path"]; ok { + if matcherCfg.HasField("resource_type") { + logsPathMatcher := struct { + LogsPath string `config:"logs_path"` + ResourceType string `config:"resource_type"` + }{} + + err := matcherCfg.Unpack(&logsPathMatcher) + if err != nil { + return fmt.Errorf("fail to unpack the `logs_path` matcher configuration: %s", err) + } + if logsPathMatcher.LogsPath == "" { + return fmt.Errorf("invalid logs_path matcher configuration: when resource_type is defined, logs_path must be set as well") + } + if logsPathMatcher.ResourceType != "pod" && logsPathMatcher.ResourceType != "container" { + return fmt.Errorf("invalid resource_type %s, valid values include `pod`, `container`", logsPathMatcher.ResourceType) + } + if logsPathMatcher.ResourceType == "pod" && !(logsPathMatcher.LogsPath == "/var/lib/kubelet/pods/" || logsPathMatcher.LogsPath == "/var/log/pods/") { + return fmt.Errorf("invalid logs_path defined for resource_type: %s, valid values include `/var/lib/kubelet/pods/`, `/var/log/pods/`", logsPathMatcher.ResourceType) + } + if logsPathMatcher.ResourceType == "container" && logsPathMatcher.LogsPath != "/var/log/containers/" { + return fmt.Errorf("invalid logs_path defined for resource_type: %s, valid value is `/var/log/containers/`", logsPathMatcher.ResourceType) + } + } + + } + } + return nil } diff --git a/libbeat/processors/add_kubernetes_metadata/config_test.go b/libbeat/processors/add_kubernetes_metadata/config_test.go index 3bdcf34a1d7..f7cc83812f6 100644 --- a/libbeat/processors/add_kubernetes_metadata/config_test.go +++ b/libbeat/processors/add_kubernetes_metadata/config_test.go @@ -60,3 +60,76 @@ func TestConfigValidate(t *testing.T) { } } } + +func TestConfigValidate_LogsPatchMatcher(t *testing.T) { + tests := []struct { + matcherName string + matcherConfig map[string]interface{} + error bool + }{ + { + matcherName: "", + matcherConfig: map[string]interface{}{}, + error: false, + }, + { + matcherName: "logs_path", + matcherConfig: map[string]interface{}{ + "resource_type": "pod", + }, + error: true, + }, + { + matcherName: "logs_path", + matcherConfig: map[string]interface{}{ + "resource_type": "pod", + "invalid_field": "invalid_value", + }, + error: true, + }, + { + matcherName: "logs_path", + matcherConfig: map[string]interface{}{ + "resource_type": "pod", + "logs_path": "/var/log/invalid/path/", + }, + error: true, + }, + { + matcherName: "logs_path", + matcherConfig: map[string]interface{}{ + "resource_type": "pod", + "logs_path": "/var/log/pods/", + }, + error: false, + }, + { + matcherName: "logs_path", + matcherConfig: map[string]interface{}{ + "resource_type": "container", + "logs_path": "/var/log/containers/", + }, + error: false, + }, + } + + for _, test := range tests { + cfg, _ := common.NewConfigFrom(test.matcherConfig) + + c := defaultKubernetesAnnotatorConfig() + c.DefaultMatchers = Enabled{false} + + err := cfg.Unpack(&c) + c.Matchers = PluginConfig{ + { + test.matcherName: *cfg, + }, + } + err = c.Validate() + if test.error { + require.NotNil(t, err) + } else { + require.Nil(t, err) + } + } +}