From a5068da5a0cbb68a91871a7d886aaa130c4bbe1d Mon Sep 17 00:00:00 2001 From: Christos Markou Date: Tue, 26 Nov 2024 19:11:09 +0200 Subject: [PATCH] [receiver/receiver_creator] Add support for enabling receivers/scrapers from K8s hints (#35617) --- .chloggen/hints.yaml | 27 ++ extension/observer/endpoints.go | 6 +- receiver/receivercreator/README.md | 188 +++++++++ receiver/receivercreator/config.go | 14 + receiver/receivercreator/config_test.go | 2 + receiver/receivercreator/discovery.go | 204 +++++++++ receiver/receivercreator/discovery_test.go | 398 ++++++++++++++++++ receiver/receivercreator/fixtures_test.go | 22 + receiver/receivercreator/go.mod | 4 +- receiver/receivercreator/observerhandler.go | 170 ++++---- .../receivercreator/observerhandler_test.go | 66 +++ 11 files changed, 1025 insertions(+), 76 deletions(-) create mode 100644 .chloggen/hints.yaml create mode 100644 receiver/receivercreator/discovery.go create mode 100644 receiver/receivercreator/discovery_test.go diff --git a/.chloggen/hints.yaml b/.chloggen/hints.yaml new file mode 100644 index 000000000000..764dc23441f2 --- /dev/null +++ b/.chloggen/hints.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: receivercreator + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add support for starting receivers/scrapers based on provided annotations' hints for metrics' collection + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [34427] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/extension/observer/endpoints.go b/extension/observer/endpoints.go index cd51a35e036d..2f58c6932c26 100644 --- a/extension/observer/endpoints.go +++ b/extension/observer/endpoints.go @@ -223,11 +223,11 @@ func (p *Pod) Type() EndpointType { // PodContainer is a discovered k8s pod's container type PodContainer struct { // Name of the container - Name string + Name string `mapstructure:"container_name"` // Image of the container - Image string + Image string `mapstructure:"container_image"` // ContainerID is the id of the container exposing the Endpoint - ContainerID string + ContainerID string `mapstructure:"container_id"` // Pod is the k8s pod in which the container is running Pod Pod } diff --git a/receiver/receivercreator/README.md b/receiver/receivercreator/README.md index 70ed6e5cb808..ac836eb98e02 100644 --- a/receiver/receivercreator/README.md +++ b/receiver/receivercreator/README.md @@ -439,3 +439,191 @@ service: The full list of settings exposed for this receiver are documented [here](./config.go) with detailed sample configurations [here](./testdata/config.yaml). + + +## Generate receiver configurations from provided Hints + +Note: When hints feature is enabled if hints are present for an endpoint no receiver templates will be evaluated. + +Currently this feature is only supported for K8s environments and the `k8sobserver`. + +The discovery feature for K8s is enabled with the following setting: + +```yaml +receiver_creator/metrics: + watch_observers: [ k8s_observer ] + discovery: + enabled: true + # Define which receivers should be ignored when provided through annotations + # ignore_receivers: [] +``` + +Find bellow the supported annotations that user can define to automatically enable receivers to start collecting metrics signals from the target Pods/containers. + +### Supported metrics annotations + +#### Enable/disable discovery + +`io.opentelemetry.discovery.metrics/enabled` (Required. `"true"` or `"false"`) + +#### Define scraper + +`io.opentelemetry.discovery.metrics/scraper` (example: `"nginx"`) + + +#### Define configuration + +`io.opentelemetry.discovery.metrics/config` + +For `"endpoint"` setting specifically, it sticks to urls that include +```"`endpoint`"``` as it comes from the Port endpoint which is +in form of `pod_ip:container_port`. This is to ensure that each Pod can only +generate configuration that targets itself and not others. +If no endpoint is provided the Pod's endpoint will be used (in form of `pod_ip:container_port`). + +**Example:** + +```yaml +io.opentelemetry.discovery.metrics/config: | + endpoint: "http://`endpoint`/nginx_status" + collection_interval: "20s" + initial_delay: "20s" + read_buffer_size: "10" + xyz: "abc" +``` + + +#### Support multiple target containers + +Users can target the annotation to a specific container by suffixing it with the name of the port that container exposes: +`io.opentelemetry.discovery.metrics./config`. +For example: +```yaml +io.opentelemetry.discovery.metrics.80/config: | + endpoint: "http://`endpoint`/nginx_status" +``` +where `80` is the port that the target container exposes. + +If a Pod is annotated with both container level hints and pod level hints the container level hints have priority and +the Pod level hints are used as a fallback (see detailed example bellow). + +The current implementation relies on the implementation of `k8sobserver` extension and specifically +the [pod_endpoint](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.111.0/extension/observer/k8sobserver/pod_endpoint.go). +The hints are evaluated per container by extracting the annotations from each [`Port` endpoint](#Port) that is emitted. + + + +### Examples + +#### Metrics example + +Collector's configuration: +```yaml +receivers: + receiver_creator/metrics: + watch_observers: [ k8s_observer ] + discovery: + enabled: true + receivers: + +service: + extensions: [ k8s_observer] + pipelines: + metrics: + receivers: [ receiver_creator ] + processors: [] + exporters: [ debug ] +``` + +Target Pod annotated with hints: + +```yaml +apiVersion: v1 +kind: ConfigMap +metadata: + name: nginx-conf +data: + nginx.conf: | + user nginx; + worker_processes 1; + error_log /dev/stderr warn; + pid /var/run/nginx.pid; + events { + worker_connections 1024; + } + http { + include /etc/nginx/mime.types; + default_type application/octet-stream; + + log_format main '$remote_addr - $remote_user [$time_local] "$request" ' + '$status $body_bytes_sent "$http_referer" ' + '"$http_user_agent" "$http_x_forwarded_for"'; + access_log /dev/stdout main; + server { + listen 80; + server_name localhost; + + location /nginx_status { + stub_status on; + } + } + include /etc/nginx/conf.d/*; + } +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: redis-deployment + labels: + app: redis +spec: + replicas: 1 + selector: + matchLabels: + app: redis + template: + metadata: + labels: + app: redis + annotations: + # redis container port metrics hints + io.opentelemetry.discovery.metrics.6379/enabled: "true" + io.opentelemetry.discovery.metrics.6379/scraper: redis + io.opentelemetry.discovery.metrics.6379/config: | + collection_interval: "20s" + timeout: "10s" + + # nginx container port metrics hints + io.opentelemetry.discovery.metrics.80/enabled: "true" + io.opentelemetry.discovery.metrics.80/scraper: nginx + io.opentelemetry.discovery.metrics.80/config: | + endpoint: "http://`endpoint`/nginx_status" + collection_interval: "30s" + timeout: "20s" + spec: + volumes: + - name: nginx-conf + configMap: + name: nginx-conf + items: + - key: nginx.conf + path: nginx.conf + containers: + - name: webserver + image: nginx:latest + ports: + - containerPort: 80 + name: webserver + volumeMounts: + - mountPath: /etc/nginx/nginx.conf + readOnly: true + subPath: nginx.conf + name: nginx-conf + - image: redis + imagePullPolicy: IfNotPresent + name: redis + ports: + - name: redis + containerPort: 6379 + protocol: TCP +``` \ No newline at end of file diff --git a/receiver/receivercreator/config.go b/receiver/receivercreator/config.go index bb5ebfaa4f6f..e531ccf8c913 100644 --- a/receiver/receivercreator/config.go +++ b/receiver/receivercreator/config.go @@ -35,6 +35,12 @@ type receiverConfig struct { // userConfigMap is an arbitrary map of string keys to arbitrary values as specified by the user type userConfigMap map[string]any +type receiverSignals struct { + metrics bool + logs bool + traces bool +} + // receiverTemplate is the configuration of a single subreceiver. type receiverTemplate struct { receiverConfig @@ -46,6 +52,7 @@ type receiverTemplate struct { // It can contain expr expressions for endpoint env value expansion ResourceAttributes map[string]any `mapstructure:"resource_attributes"` rule rule + signals receiverSignals } // resourceAttributes holds a map of default resource attributes for each Endpoint type. @@ -60,6 +67,7 @@ func newReceiverTemplate(name string, cfg userConfigMap) (receiverTemplate, erro } return receiverTemplate{ + signals: receiverSignals{metrics: true, logs: true, traces: true}, receiverConfig: receiverConfig{ id: id, config: cfg, @@ -78,6 +86,12 @@ type Config struct { // ResourceAttributes is a map of default resource attributes to add to each resource // object received by this receiver from dynamically created receivers. ResourceAttributes resourceAttributes `mapstructure:"resource_attributes"` + Discovery DiscoveryConfig `mapstructure:"discovery"` +} + +type DiscoveryConfig struct { + Enabled bool `mapstructure:"enabled"` + IgnoreReceivers []string `mapstructure:"ignore_receivers"` } func (cfg *Config) Unmarshal(componentParser *confmap.Conf) error { diff --git a/receiver/receivercreator/config_test.go b/receiver/receivercreator/config_test.go index ee8779de712e..618add4e174d 100644 --- a/receiver/receivercreator/config_test.go +++ b/receiver/receivercreator/config_test.go @@ -90,6 +90,7 @@ func TestLoadConfig(t *testing.T) { Rule: `type == "port"`, ResourceAttributes: map[string]any{"one": "two"}, rule: portRule, + signals: receiverSignals{true, true, true}, }, "nop/1": { receiverConfig: receiverConfig{ @@ -102,6 +103,7 @@ func TestLoadConfig(t *testing.T) { Rule: `type == "port"`, ResourceAttributes: map[string]any{"two": "three"}, rule: portRule, + signals: receiverSignals{true, true, true}, }, }, WatchObservers: []component.ID{ diff --git a/receiver/receivercreator/discovery.go b/receiver/receivercreator/discovery.go new file mode 100644 index 000000000000..f8a694912751 --- /dev/null +++ b/receiver/receivercreator/discovery.go @@ -0,0 +1,204 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package receivercreator // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/receivercreator" + +import ( + "fmt" + "net/url" + "strings" + + "github.com/go-viper/mapstructure/v2" + "go.uber.org/zap" + "gopkg.in/yaml.v3" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" +) + +const ( + // hints prefix + otelHints = "io.opentelemetry.discovery" + + // hint suffix for metrics + otelMetricsHints = otelHints + ".metrics" + + // hints definitions + discoveryEnabledHint = "enabled" + scraperHint = "scraper" + configHint = "config" +) + +// k8sHintsBuilder creates configurations from hints provided as Pod's annotations. +type k8sHintsBuilder struct { + logger *zap.Logger + ignoreReceivers map[string]bool +} + +func createK8sHintsBuilder(config DiscoveryConfig, logger *zap.Logger) k8sHintsBuilder { + ignoreReceivers := make(map[string]bool, len(config.IgnoreReceivers)) + for _, r := range config.IgnoreReceivers { + ignoreReceivers[r] = true + } + return k8sHintsBuilder{ + logger: logger, + ignoreReceivers: ignoreReceivers, + } +} + +// createReceiverTemplateFromHints creates a receiver configuration based on the provided hints. +// Hints are extracted from Pod's annotations. +// Scraper configurations are only created for Port Endpoints. +// TODO: Log receiver configurations are only created for Pod Container Endpoints. +func (builder *k8sHintsBuilder) createReceiverTemplateFromHints(env observer.EndpointEnv) (*receiverTemplate, error) { + var pod observer.Pod + + endpointType := getStringEnv(env, "type") + if endpointType == "" { + return nil, fmt.Errorf("could not get endpoint type: %v", zap.Any("env", env)) + } + + if endpointType != string(observer.PortType) { + return nil, nil + } + + builder.logger.Debug("handling hints for added endpoint", zap.Any("env", env)) + + if endpointPod, ok := env["pod"]; ok { + err := mapstructure.Decode(endpointPod, &pod) + if err != nil { + return nil, fmt.Errorf("could not extract endpoint's pod: %v", zap.Any("endpointPod", pod)) + } + } else { + return nil, nil + } + + return builder.createScraper(pod.Annotations, env) +} + +func (builder *k8sHintsBuilder) createScraper( + annotations map[string]string, + env observer.EndpointEnv, +) (*receiverTemplate, error) { + var port uint16 + var p observer.Port + err := mapstructure.Decode(env, &p) + if err != nil { + return nil, fmt.Errorf("could not extract port event: %v", zap.Any("env", env)) + } + if p.Port == 0 { + return nil, fmt.Errorf("could not extract port: %v", zap.Any("env", env)) + } + port = p.Port + pod := p.Pod + + if !discoveryMetricsEnabled(annotations, otelMetricsHints, fmt.Sprint(port)) { + return nil, nil + } + + subreceiverKey, found := getHintAnnotation(annotations, otelMetricsHints, scraperHint, fmt.Sprint(port)) + if !found || subreceiverKey == "" { + // no scraper hint detected + return nil, nil + } + if _, ok := builder.ignoreReceivers[subreceiverKey]; ok { + // scraper is ignored + return nil, nil + } + builder.logger.Debug("handling added hinted receiver", zap.Any("subreceiverKey", subreceiverKey)) + + defaultEndpoint := getStringEnv(env, endpointConfigKey) + userConfMap, err := getScraperConfFromAnnotations(annotations, defaultEndpoint, fmt.Sprint(port), builder.logger) + if err != nil { + return nil, fmt.Errorf("could not create receiver configuration: %v", zap.Any("err", err)) + } + + recTemplate, err := newReceiverTemplate(fmt.Sprintf("%v/%v_%v", subreceiverKey, pod.UID, port), userConfMap) + recTemplate.signals = receiverSignals{true, false, false} + + return &recTemplate, err +} + +func getScraperConfFromAnnotations( + annotations map[string]string, + defaultEndpoint, scopeSuffix string, + logger *zap.Logger, +) (userConfigMap, error) { + conf := userConfigMap{} + conf[endpointConfigKey] = defaultEndpoint + + configStr, found := getHintAnnotation(annotations, otelMetricsHints, configHint, scopeSuffix) + if !found || configStr == "" { + return conf, nil + } + if err := yaml.Unmarshal([]byte(configStr), &conf); err != nil { + return userConfigMap{}, fmt.Errorf("could not unmarshal configuration from hint: %v", zap.Error(err)) + } + + val := conf[endpointConfigKey] + confEndpoint, ok := val.(string) + if !ok { + logger.Debug("could not extract configured endpoint") + return userConfigMap{}, fmt.Errorf("could not extract configured endpoint") + } + + err := validateEndpoint(confEndpoint, defaultEndpoint) + if err != nil { + logger.Debug("configured endpoint is not valid", zap.Error(err)) + return userConfigMap{}, fmt.Errorf("configured endpoint is not valid: %v", zap.Error(err)) + } + return conf, nil +} + +func getHintAnnotation(annotations map[string]string, hintBase string, hintKey string, suffix string) (string, bool) { + // try to scope the hint more on container level by suffixing + // with . in case of Port event or # TODO: . in case of Pod Container event + containerLevelHint, ok := annotations[fmt.Sprintf("%s.%s/%s", hintBase, suffix, hintKey)] + if ok { + return containerLevelHint, ok + } + + // if there is no container level hint defined try to use the Pod level hint + podLevelHint, ok := annotations[fmt.Sprintf("%s/%s", hintBase, hintKey)] + return podLevelHint, ok +} + +func discoveryMetricsEnabled(annotations map[string]string, hintBase string, scopeSuffix string) bool { + enabledHint, found := getHintAnnotation(annotations, hintBase, discoveryEnabledHint, scopeSuffix) + if !found { + return false + } + return enabledHint == "true" +} + +func getStringEnv(env observer.EndpointEnv, key string) string { + var valString string + if val, ok := env[key]; ok { + valString, ok = val.(string) + if !ok { + return "" + } + } + return valString +} + +func validateEndpoint(endpoint, defaultEndpoint string) error { + // replace temporarily the dynamic reference to ease the url parsing + endpoint = strings.ReplaceAll(endpoint, "`endpoint`", defaultEndpoint) + + uri, _ := url.Parse(endpoint) + // target endpoint can come in form ip:port. In that case we fix the uri + // temporarily with adding http scheme + if uri == nil { + u, err := url.Parse("http://" + endpoint) + if err != nil { + return fmt.Errorf("could not parse enpoint") + } + uri = u + } + + // configured endpoint should include the target Pod's endpoint + if uri.Host != defaultEndpoint { + return fmt.Errorf("configured enpoint should include target Pod's endpoint") + } + return nil +} diff --git a/receiver/receivercreator/discovery_test.go b/receiver/receivercreator/discovery_test.go new file mode 100644 index 000000000000..982f78de8396 --- /dev/null +++ b/receiver/receivercreator/discovery_test.go @@ -0,0 +1,398 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package receivercreator + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.uber.org/zap" + "go.uber.org/zap/zaptest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" +) + +func TestK8sHintsBuilderMetrics(t *testing.T) { + logger := zaptest.NewLogger(t, zaptest.Level(zap.InfoLevel)) + + id := component.ID{} + err := id.UnmarshalText([]byte("redis/pod-2-UID_6379")) + assert.NoError(t, err) + + config := ` +collection_interval: "20s" +timeout: "30s" +username: "username" +password: "changeme"` + configRedis := ` +collection_interval: "20s" +timeout: "130s" +username: "username" +password: "changeme"` + + tests := map[string]struct { + inputEndpoint observer.Endpoint + expectedReceiver receiverTemplate + ignoreReceivers []string + wantError bool + }{ + `metrics_pod_level_hints_only`: { + inputEndpoint: observer.Endpoint{ + ID: "namespace/pod-2-UID/redis(6379)", + Target: "1.2.3.4:6379", + Details: &observer.Port{ + Name: "redis", Pod: observer.Pod{ + Name: "pod-2", + Namespace: "default", + UID: "pod-2-UID", + Labels: map[string]string{"env": "prod"}, + Annotations: map[string]string{ + otelMetricsHints + "/enabled": "true", + otelMetricsHints + "/scraper": "redis", + otelMetricsHints + "/config": config, + }, + }, + Port: 6379, + }, + }, + expectedReceiver: receiverTemplate{ + receiverConfig: receiverConfig{ + id: id, + config: userConfigMap{"collection_interval": "20s", "endpoint": "1.2.3.4:6379", "password": "changeme", "timeout": "30s", "username": "username"}, + }, signals: receiverSignals{metrics: true, logs: false, traces: false}, + }, + wantError: false, + ignoreReceivers: []string{}, + }, `metrics_pod_level_ignore`: { + inputEndpoint: observer.Endpoint{ + ID: "namespace/pod-2-UID/redis(6379)", + Target: "1.2.3.4:6379", + Details: &observer.Port{ + Name: "redis", Pod: observer.Pod{ + Name: "pod-2", + Namespace: "default", + UID: "pod-2-UID", + Labels: map[string]string{"env": "prod"}, + Annotations: map[string]string{ + otelMetricsHints + "/enabled": "true", + otelMetricsHints + "/scraper": "redis", + otelMetricsHints + "/config": config, + }, + }, + Port: 6379, + }, + }, + expectedReceiver: receiverTemplate{}, + wantError: false, + ignoreReceivers: []string{"redis"}, + }, `metrics_pod_level_hints_only_defaults`: { + inputEndpoint: observer.Endpoint{ + ID: "namespace/pod-2-UID/redis(6379)", + Target: "1.2.3.4:6379", + Details: &observer.Port{ + Name: "redis", Pod: observer.Pod{ + Name: "pod-2", + Namespace: "default", + UID: "pod-2-UID", + Labels: map[string]string{"env": "prod"}, + Annotations: map[string]string{ + otelMetricsHints + "/enabled": "true", + otelMetricsHints + "/scraper": "redis", + }, + }, + Port: 6379, + }, + }, + expectedReceiver: receiverTemplate{ + receiverConfig: receiverConfig{ + id: id, + config: userConfigMap{"endpoint": "1.2.3.4:6379"}, + }, signals: receiverSignals{metrics: true, logs: false, traces: false}, + }, + wantError: false, + ignoreReceivers: []string{}, + }, `metrics_container_level_hints`: { + inputEndpoint: observer.Endpoint{ + ID: "namespace/pod-2-UID/redis(6379)", + Target: "1.2.3.4:6379", + Details: &observer.Port{ + Name: "redis", Pod: observer.Pod{ + Name: "pod-2", + Namespace: "default", + UID: "pod-2-UID", + Labels: map[string]string{"env": "prod"}, + Annotations: map[string]string{ + otelMetricsHints + ".6379/enabled": "true", + otelMetricsHints + ".6379/scraper": "redis", + otelMetricsHints + ".6379/config": config, + }, + }, + Port: 6379, + }, + }, + expectedReceiver: receiverTemplate{ + receiverConfig: receiverConfig{ + id: id, + config: userConfigMap{"collection_interval": "20s", "endpoint": "1.2.3.4:6379", "password": "changeme", "timeout": "30s", "username": "username"}, + }, signals: receiverSignals{metrics: true, logs: false, traces: false}, + }, + wantError: false, + ignoreReceivers: []string{}, + }, `metrics_mix_level_hints`: { + inputEndpoint: observer.Endpoint{ + ID: "namespace/pod-2-UID/redis(6379)", + Target: "1.2.3.4:6379", + Details: &observer.Port{ + Name: "redis", Pod: observer.Pod{ + Name: "pod-2", + Namespace: "default", + UID: "pod-2-UID", + Labels: map[string]string{"env": "prod"}, + Annotations: map[string]string{ + otelMetricsHints + ".6379/enabled": "true", + otelMetricsHints + ".6379/scraper": "redis", + otelMetricsHints + "/config": config, + otelMetricsHints + ".6379/config": configRedis, + }, + }, + Port: 6379, + }, + }, + expectedReceiver: receiverTemplate{ + receiverConfig: receiverConfig{ + id: id, + config: userConfigMap{"collection_interval": "20s", "endpoint": "1.2.3.4:6379", "password": "changeme", "timeout": "130s", "username": "username"}, + }, signals: receiverSignals{metrics: true, logs: false, traces: false}, + }, + wantError: false, + ignoreReceivers: []string{}, + }, `metrics_no_port_error`: { + inputEndpoint: observer.Endpoint{ + ID: "namespace/pod-2-UID/redis(6379)", + Target: "1.2.3.4", + Details: &observer.Port{ + Name: "redis", Pod: observer.Pod{ + Name: "pod-2", + Namespace: "default", + UID: "pod-2-UID", + Labels: map[string]string{"env": "prod"}, + Annotations: map[string]string{ + otelMetricsHints + "/enabled": "true", + otelMetricsHints + "/scraper": "redis", + otelMetricsHints + "/config": config, + }, + }, + }, + }, + expectedReceiver: receiverTemplate{}, + wantError: true, + ignoreReceivers: []string{}, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + builder := createK8sHintsBuilder(DiscoveryConfig{Enabled: true, IgnoreReceivers: test.ignoreReceivers}, logger) + env, err := test.inputEndpoint.Env() + require.NoError(t, err) + subreceiverTemplate, err := builder.createReceiverTemplateFromHints(env) + if subreceiverTemplate == nil { + require.Equal(t, receiverTemplate{}, test.expectedReceiver) + return + } + if !test.wantError { + require.NoError(t, err) + require.Equal(t, subreceiverTemplate.receiverConfig.config, test.expectedReceiver.receiverConfig.config) + require.Equal(t, subreceiverTemplate.signals, test.expectedReceiver.signals) + require.Equal(t, subreceiverTemplate.id, test.expectedReceiver.id) + } else { + require.Error(t, err) + } + }) + } +} + +func TestGetConfFromAnnotations(t *testing.T) { + config := ` +endpoint: "0.0.0.0:8080" +collection_interval: "20s" +initial_delay: "20s" +read_buffer_size: "10" +nested_example: + foo: bar` + configNoEndpoint := ` +collection_interval: "20s" +initial_delay: "20s" +read_buffer_size: "10" +nested_example: + foo: bar` + tests := map[string]struct { + hintsAnn map[string]string + expectedConf userConfigMap + defaultEndpoint string + scopeSuffix string + expectError bool + }{ + "simple_annotation_case": { + hintsAnn: map[string]string{ + "io.opentelemetry.discovery.metrics/enabled": "true", + "io.opentelemetry.discovery.metrics/config": config, + }, expectedConf: userConfigMap{ + "collection_interval": "20s", + "endpoint": "0.0.0.0:8080", + "initial_delay": "20s", + "read_buffer_size": "10", + "nested_example": userConfigMap{"foo": "bar"}, + }, defaultEndpoint: "0.0.0.0:8080", + scopeSuffix: "", + }, "simple_annotation_case_default_endpoint": { + hintsAnn: map[string]string{ + "io.opentelemetry.discovery.metrics/enabled": "true", + "io.opentelemetry.discovery.metrics/config": configNoEndpoint, + }, expectedConf: userConfigMap{ + "collection_interval": "20s", + "endpoint": "1.1.1.1:8080", + "initial_delay": "20s", + "read_buffer_size": "10", + "nested_example": userConfigMap{"foo": "bar"}, + }, defaultEndpoint: "1.1.1.1:8080", + scopeSuffix: "", + }, "simple_annotation_case_scoped": { + hintsAnn: map[string]string{ + "io.opentelemetry.discovery.metrics.8080/enabled": "true", + "io.opentelemetry.discovery.metrics.8080/config": config, + }, expectedConf: userConfigMap{ + "collection_interval": "20s", + "endpoint": "0.0.0.0:8080", + "initial_delay": "20s", + "read_buffer_size": "10", + "nested_example": userConfigMap{"foo": "bar"}, + }, defaultEndpoint: "0.0.0.0:8080", + scopeSuffix: "8080", + }, "simple_annotation_case_with_invalid_endpoint": { + hintsAnn: map[string]string{ + "io.opentelemetry.discovery.metrics/enabled": "true", + "io.opentelemetry.discovery.metrics/config": config, + }, expectedConf: userConfigMap{}, + defaultEndpoint: "1.2.3.4:8080", + scopeSuffix: "", + expectError: true, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + conf, err := getScraperConfFromAnnotations(test.hintsAnn, test.defaultEndpoint, test.scopeSuffix, zaptest.NewLogger(t, zaptest.Level(zap.InfoLevel))) + if test.expectError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal( + t, + test.expectedConf, + conf) + } + }) + } +} + +func TestDiscoveryMetricsEnabled(t *testing.T) { + config := ` +endpoint: "0.0.0.0:8080"` + tests := map[string]struct { + hintsAnn map[string]string + expected bool + scopeSuffix string + }{ + "test_enabled": { + hintsAnn: map[string]string{ + "io.opentelemetry.discovery.metrics/config": config, + "io.opentelemetry.discovery.metrics/enabled": "true", + }, + expected: true, + scopeSuffix: "", + }, "test_disabled": { + hintsAnn: map[string]string{ + "io.opentelemetry.discovery.metrics/config": config, + "io.opentelemetry.discovery.metrics/enabled": "false", + }, + expected: false, + scopeSuffix: "", + }, "test_enabled_scope": { + hintsAnn: map[string]string{ + "io.opentelemetry.discovery.metrics/config": config, + "io.opentelemetry.discovery.metrics.8080/enabled": "true", + }, + expected: true, + scopeSuffix: "8080", + }, "test_disabled_scoped": { + hintsAnn: map[string]string{ + "io.opentelemetry.discovery.metrics/config": config, + "io.opentelemetry.discovery.metrics.8080/enabled": "false", + }, + expected: false, + scopeSuffix: "8080", + }, + } + for name, test := range tests { + t.Run(name, func(t *testing.T) { + assert.Equal( + t, + test.expected, + discoveryMetricsEnabled(test.hintsAnn, otelMetricsHints, test.scopeSuffix), + ) + }) + } +} + +func TestValidateEndpoint(t *testing.T) { + tests := map[string]struct { + endpoint string + defaultEndpoint string + expectError bool + }{ + "test_valid": { + endpoint: "http://1.2.3.4:8080/stats", + defaultEndpoint: "1.2.3.4:8080", + expectError: false, + }, + "test_invalid": { + endpoint: "http://0.0.0.0:8080/some?foo=1.2.3.4:8080", + defaultEndpoint: "1.2.3.4:8080", + expectError: true, + }, + "test_valid_no_scheme": { + endpoint: "1.2.3.4:8080/stats", + defaultEndpoint: "1.2.3.4:8080", + expectError: false, + }, + "test_valid_no_scheme_no_path": { + endpoint: "1.2.3.4:8080", + defaultEndpoint: "1.2.3.4:8080", + expectError: false, + }, + "test_valid_no_scheme_dynamic": { + endpoint: "`endpoint`/stats", + defaultEndpoint: "1.2.3.4:8080", + expectError: false, + }, + "test_valid_dynamic": { + endpoint: "http://`endpoint`/stats", + defaultEndpoint: "1.2.3.4:8080", + expectError: false, + }, + } + for name, test := range tests { + t.Run(name, func(t *testing.T) { + err := validateEndpoint(test.endpoint, test.defaultEndpoint) + if test.expectError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} diff --git a/receiver/receivercreator/fixtures_test.go b/receiver/receivercreator/fixtures_test.go index 069604d70344..7abffe5d986e 100644 --- a/receiver/receivercreator/fixtures_test.go +++ b/receiver/receivercreator/fixtures_test.go @@ -56,6 +56,28 @@ var portEndpoint = observer.Endpoint{ }, } +var config = ` +int_field: 20` + +var portEndpointWithHints = observer.Endpoint{ + ID: "namespace/pod-2-UID/redis(6379)", + Target: "1.2.3.4:6379", + Details: &observer.Port{ + Name: "redis", Pod: observer.Pod{ + Name: "pod-2", + Namespace: "default", + UID: "pod-2-UID", + Labels: map[string]string{"env": "prod"}, + Annotations: map[string]string{ + otelMetricsHints + "/enabled": "true", + otelMetricsHints + "/scraper": "with_endpoint", + otelMetricsHints + "/config": config, + }, + }, + Port: 6379, + }, +} + var hostportEndpoint = observer.Endpoint{ ID: "port-1", Target: "localhost:1234", diff --git a/receiver/receivercreator/go.mod b/receiver/receivercreator/go.mod index ffde47ed455d..724006841de4 100644 --- a/receiver/receivercreator/go.mod +++ b/receiver/receivercreator/go.mod @@ -4,6 +4,7 @@ go 1.22.0 require ( github.com/expr-lang/expr v1.16.9 + github.com/go-viper/mapstructure/v2 v2.2.1 github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer v0.114.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.114.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.114.0 @@ -26,6 +27,7 @@ require ( go.uber.org/goleak v1.3.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 + gopkg.in/yaml.v3 v3.0.1 ) require ( @@ -37,7 +39,6 @@ require ( github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-ole/go-ole v1.2.6 // indirect - github.com/go-viper/mapstructure/v2 v2.2.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/google/uuid v1.6.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.23.0 // indirect @@ -122,7 +123,6 @@ require ( google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28 // indirect google.golang.org/grpc v1.67.1 // indirect google.golang.org/protobuf v1.35.1 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect ) replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer => ../../extension/observer diff --git a/receiver/receivercreator/observerhandler.go b/receiver/receivercreator/observerhandler.go index cdf5ed82b626..40244c8e7bfe 100644 --- a/receiver/receivercreator/observerhandler.go +++ b/receiver/receivercreator/observerhandler.go @@ -81,85 +81,28 @@ func (obs *observerHandler) OnAdd(added []observer.Endpoint) { continue } - obs.params.TelemetrySettings.Logger.Debug("handling added endpoint", zap.Any("env", env)) - - for _, template := range obs.config.receiverTemplates { - if matches, e := template.rule.eval(env); e != nil { - obs.params.TelemetrySettings.Logger.Error("failed matching rule", zap.String("rule", template.Rule), zap.Error(e)) - continue - } else if !matches { - continue - } - - obs.params.TelemetrySettings.Logger.Info("starting receiver", - zap.String("name", template.id.String()), - zap.String("endpoint", e.Target), - zap.String("endpoint_id", string(e.ID))) - - resolvedConfig, err := expandConfig(template.config, env) + if obs.config.Discovery.Enabled { + builder := createK8sHintsBuilder(obs.config.Discovery, obs.params.TelemetrySettings.Logger) + subreceiverTemplate, err := builder.createReceiverTemplateFromHints(env) if err != nil { - obs.params.TelemetrySettings.Logger.Error("unable to resolve template config", zap.String("receiver", template.id.String()), zap.Error(err)) - continue - } - obs.params.TelemetrySettings.Logger.Debug("resolved config", zap.String("receiver", template.id.String()), zap.Any("config", resolvedConfig)) - - discoveredCfg := userConfigMap{} - // If user didn't set endpoint set to default value as well as - // flag indicating we've done this for later validation. - if _, ok := resolvedConfig[endpointConfigKey]; !ok { - discoveredCfg[endpointConfigKey] = e.Target - discoveredCfg[tmpSetEndpointConfigKey] = struct{}{} + obs.params.TelemetrySettings.Logger.Error("could not extract configurations from K8s hints' annotations", zap.Any("err", err)) + break } - - // Though not necessary with contrib provided observers, nothing is stopping custom - // ones from using expr in their Target values. - discoveredConfig, err := expandConfig(discoveredCfg, env) - if err != nil { - obs.params.TelemetrySettings.Logger.Error("unable to resolve discovered config", zap.String("receiver", template.id.String()), zap.Error(err)) + if subreceiverTemplate != nil { + obs.params.TelemetrySettings.Logger.Debug("adding K8s hinted receiver", zap.Any("subreceiver", subreceiverTemplate)) + obs.startReceiver(*subreceiverTemplate, env, e) continue } + } - resAttrs := map[string]string{} - for k, v := range template.ResourceAttributes { - strVal, ok := v.(string) - if !ok { - obs.params.TelemetrySettings.Logger.Info(fmt.Sprintf("ignoring unsupported `resource_attributes` %q value %v", k, v)) - continue - } - resAttrs[k] = strVal - } - - // Adds default and/or configured resource attributes (e.g. k8s.pod.uid) to resources - // as telemetry is emitted. - var consumer *enhancingConsumer - if consumer, err = newEnhancingConsumer( - obs.config.ResourceAttributes, - resAttrs, - env, - e, - obs.nextLogsConsumer, - obs.nextMetricsConsumer, - obs.nextTracesConsumer, - ); err != nil { - obs.params.TelemetrySettings.Logger.Error("failed creating resource enhancer", zap.String("receiver", template.id.String()), zap.Error(err)) + for _, template := range obs.config.receiverTemplates { + if matches, err := template.rule.eval(env); err != nil { + obs.params.TelemetrySettings.Logger.Error("failed matching rule", zap.String("rule", template.Rule), zap.Error(err)) continue - } - - var receiver component.Component - if receiver, err = obs.runner.start( - receiverConfig{ - id: template.id, - config: resolvedConfig, - endpointID: e.ID, - }, - discoveredConfig, - consumer, - ); err != nil { - obs.params.TelemetrySettings.Logger.Error("failed to start receiver", zap.String("receiver", template.id.String()), zap.Error(err)) + } else if !matches { continue } - - obs.receiversByEndpointID.Put(e.ID, receiver) + obs.startReceiver(template, env, e) } } } @@ -198,3 +141,88 @@ func (obs *observerHandler) OnChange(changed []observer.Endpoint) { obs.OnRemove(changed) obs.OnAdd(changed) } + +func (obs *observerHandler) startReceiver(template receiverTemplate, env observer.EndpointEnv, e observer.Endpoint) { + obs.params.TelemetrySettings.Logger.Info("starting receiver", + zap.String("name", template.id.String()), + zap.String("endpoint", e.Target), + zap.String("endpoint_id", string(e.ID)), + zap.Any("config", template.config)) + + resolvedConfig, err := expandConfig(template.config, env) + if err != nil { + obs.params.TelemetrySettings.Logger.Error("unable to resolve template config", zap.String("receiver", template.id.String()), zap.Error(err)) + return + } + + discoveredCfg := userConfigMap{} + // If user didn't set endpoint set to default value as well as + // flag indicating we've done this for later validation. + if _, ok := resolvedConfig[endpointConfigKey]; !ok { + discoveredCfg[endpointConfigKey] = e.Target + discoveredCfg[tmpSetEndpointConfigKey] = struct{}{} + } + + // Though not necessary with contrib provided observers, nothing is stopping custom + // ones from using expr in their Target values. + discoveredConfig, err := expandConfig(discoveredCfg, env) + if err != nil { + obs.params.TelemetrySettings.Logger.Error("unable to resolve discovered config", zap.String("receiver", template.id.String()), zap.Error(err)) + return + } + + resAttrs := map[string]string{} + for k, v := range template.ResourceAttributes { + strVal, ok := v.(string) + if !ok { + obs.params.TelemetrySettings.Logger.Info(fmt.Sprintf("ignoring unsupported `resource_attributes` %q value %v", k, v)) + continue + } + resAttrs[k] = strVal + } + + // Adds default and/or configured resource attributes (e.g. k8s.pod.uid) to resources + // as telemetry is emitted. + var consumer *enhancingConsumer + if consumer, err = newEnhancingConsumer( + obs.config.ResourceAttributes, + resAttrs, + env, + e, + obs.nextLogsConsumer, + obs.nextMetricsConsumer, + obs.nextTracesConsumer, + ); err != nil { + obs.params.TelemetrySettings.Logger.Error("failed creating resource enhancer", zap.String("receiver", template.id.String()), zap.Error(err)) + return + } + + filterConsumerSignals(consumer, template.signals) + + var receiver component.Component + if receiver, err = obs.runner.start( + receiverConfig{ + id: template.id, + config: resolvedConfig, + endpointID: e.ID, + }, + discoveredConfig, + consumer, + ); err != nil { + obs.params.TelemetrySettings.Logger.Error("failed to start receiver", zap.String("receiver", template.id.String()), zap.Error(err)) + return + } + obs.receiversByEndpointID.Put(e.ID, receiver) +} + +func filterConsumerSignals(consumer *enhancingConsumer, signals receiverSignals) { + if !signals.metrics { + consumer.metrics = nil + } + if !signals.logs { + consumer.logs = nil + } + if !signals.metrics { + consumer.traces = nil + } +} diff --git a/receiver/receivercreator/observerhandler_test.go b/receiver/receivercreator/observerhandler_test.go index 14cd5e7a7c97..8b91da5064a2 100644 --- a/receiver/receivercreator/observerhandler_test.go +++ b/receiver/receivercreator/observerhandler_test.go @@ -78,6 +78,7 @@ func TestOnAddForMetrics(t *testing.T) { rule: portRule, Rule: `type == "port"`, ResourceAttributes: map[string]any{}, + signals: receiverSignals{true, true, true}, }, } @@ -121,6 +122,66 @@ func TestOnAddForMetrics(t *testing.T) { } } +func TestOnAddForMetricsWithHints(t *testing.T) { + for _, test := range []struct { + name string + expectedReceiverType component.Component + expectedReceiverConfig component.Config + expectedError string + }{ + { + name: "dynamically set with supported endpoint", + expectedReceiverType: &nopWithEndpointReceiver{}, + expectedReceiverConfig: &nopWithEndpointConfig{ + IntField: 20, + Endpoint: "1.2.3.4:6379", + }, + }, + } { + t.Run(test.name, func(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.Discovery.Enabled = true + + handler, mr := newObserverHandler(t, cfg, nil, consumertest.NewNop(), nil) + handler.OnAdd([]observer.Endpoint{ + portEndpointWithHints, + unsupportedEndpoint, + }) + + if test.expectedError != "" { + assert.Equal(t, 0, handler.receiversByEndpointID.Size()) + require.Error(t, mr.lastError) + require.ErrorContains(t, mr.lastError, test.expectedError) + require.Nil(t, mr.startedComponent) + return + } + + assert.Equal(t, 1, handler.receiversByEndpointID.Size()) + require.NoError(t, mr.lastError) + require.NotNil(t, mr.startedComponent) + + wr, ok := mr.startedComponent.(*wrappedReceiver) + require.True(t, ok) + + require.Nil(t, wr.logs) + require.Nil(t, wr.traces) + + var actualConfig component.Config + switch v := wr.metrics.(type) { + case *nopWithEndpointReceiver: + require.NotNil(t, v) + actualConfig = v.cfg + case *nopWithoutEndpointReceiver: + require.NotNil(t, v) + actualConfig = v.cfg + default: + t.Fatalf("unexpected startedComponent: %T", v) + } + require.Equal(t, test.expectedReceiverConfig, actualConfig) + }) + } +} + func TestOnAddForLogs(t *testing.T) { for _, test := range []struct { name string @@ -180,6 +241,7 @@ func TestOnAddForLogs(t *testing.T) { rule: portRule, Rule: `type == "port"`, ResourceAttributes: map[string]any{}, + signals: receiverSignals{metrics: true, logs: true, traces: true}, }, } @@ -282,6 +344,7 @@ func TestOnAddForTraces(t *testing.T) { rule: portRule, Rule: `type == "port"`, ResourceAttributes: map[string]any{}, + signals: receiverSignals{metrics: true, logs: true, traces: true}, }, } @@ -338,6 +401,7 @@ func TestOnRemoveForMetrics(t *testing.T) { rule: portRule, Rule: `type == "port"`, ResourceAttributes: map[string]any{}, + signals: receiverSignals{metrics: true, logs: true, traces: true}, }, } handler, r := newObserverHandler(t, cfg, nil, consumertest.NewNop(), nil) @@ -367,6 +431,7 @@ func TestOnRemoveForLogs(t *testing.T) { rule: portRule, Rule: `type == "port"`, ResourceAttributes: map[string]any{}, + signals: receiverSignals{metrics: true, logs: true, traces: true}, }, } handler, r := newObserverHandler(t, cfg, consumertest.NewNop(), nil, nil) @@ -396,6 +461,7 @@ func TestOnChange(t *testing.T) { rule: portRule, Rule: `type == "port"`, ResourceAttributes: map[string]any{}, + signals: receiverSignals{metrics: true, logs: true, traces: true}, }, } handler, r := newObserverHandler(t, cfg, nil, consumertest.NewNop(), nil)