Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receiver_creator] add receiver-specific resource attributes #11766

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 32 additions & 6 deletions receiver/receivercreator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,13 @@ config:
endpoint: '`endpoint`:8080'
```

**receivers.<receiver_type/id>.resource_attributes**
**receivers.resource_attributes**

```yaml
resource_attributes:
<endpoint type>:
<attribute>: <attribute value>
```

This setting controls what resource attributes are set on metrics emitted from the created receiver. These attributes can be set from [values in the endpoint](#rule-expressions) that was matched by the `rule`. These attributes vary based on the endpoint type. These defaults can be disabled by setting the attribute to be removed to an empty value. Note that the values can be dynamic and processed the same as in `config`.

Expand Down Expand Up @@ -98,6 +104,18 @@ None

See `redis/2` in [examples](#examples).


**receivers.&lt;receiver_type/id&gt;.resource_attributes**

```yaml
receivers:
<receiver_type>:
resource_attributes:
<attribute>: <attribute string value>
```

Similar to the per-endpoint type `resource_attributes` described above but for individual receiver instances. Duplicate attribute entries (including the empty string) in this receiver-specific mapping take precedence. These attribute values also support expansion from endpoint environment content. At this time their values must be strings.

## Rule Expressions

Each rule must start with `type == ("pod"|"port"|"hostport"|"container") &&` such that the rule matches
Expand Down Expand Up @@ -190,6 +208,10 @@ receivers:
config:
metrics_path: '`"prometheus.io/path" in annotations ? annotations["prometheus.io/path"] : "/metrics"`'
endpoint: '`endpoint`:`"prometheus.io/port" in annotations ? annotations["prometheus.io/port"] : 9090`'
resource_attributes:
an.attribute: a.value
# Dynamic configuration values
app.version: '`labels["app_version"]`'

redis/1:
# If this rule matches an instance of this receiver will be started.
Expand All @@ -205,18 +227,22 @@ receivers:
rule: type == "port" && port == 6379

resource_attributes:
# Dynamic configuration values
service.name: '`pod.labels["service_name"]`'
app: '`pod.labels["app"]`'
# Dynamic configuration values, overwriting default attributes`
pod:
service.name: '`labels["service_name"]`'
app: '`labels["app"]`'
port:
service.name: '`pod.labels["service_name"]`'
app: '`pod.labels["app"]`'
receiver_creator/2:
# Name of the extensions to watch for endpoints to start and stop.
watch_observers: [host_observer]
receivers:
redis/on_host:
# If this rule matches an instance of this receiver will be started.
rule: type == "port" && port == 6379 && is_ipv6 == true
resource_attributes:
service.name: redis_on_host
resource_attributes:
service.name: redis_on_host
receiver_creator/3:
watch_observers: [k8s_observer]
receivers:
Expand Down
19 changes: 18 additions & 1 deletion receiver/receivercreator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ type receiverTemplate struct {
// Rule is the discovery rule that when matched will create a receiver instance
// based on receiverTemplate.
Rule string `mapstructure:"rule"`
rule rule
// ResourceAttributes is a map of resource attributes to add to just this receiver's resource metrics.
// It can contain expr expressions for endpoint env value expansion
ResourceAttributes map[string]interface{} `mapstructure:"resource_attributes"`
rule rule
}

// resourceAttributes holds a map of default resource attributes for each Endpoint type.
Expand Down Expand Up @@ -97,6 +100,14 @@ func (cfg *Config) Unmarshal(componentParser *confmap.Conf) error {
return err
}

for endpointType := range cfg.ResourceAttributes {
switch endpointType {
case observer.ContainerType, observer.HostPortType, observer.K8sNodeType, observer.PodType, observer.PortType:
default:
return fmt.Errorf("resource attributes for unsupported endpoint type %q", endpointType)
}
}

receiversCfg, err := componentParser.Sub(receiversConfigKey)
if err != nil {
return fmt.Errorf("unable to extract key %v: %w", receiversConfigKey, err)
Expand All @@ -123,6 +134,12 @@ func (cfg *Config) Unmarshal(componentParser *confmap.Conf) error {
return fmt.Errorf("subreceiver %q rule is invalid: %w", subreceiverKey, err)
}

for k, v := range subreceiver.ResourceAttributes {
if _, ok := v.(string); !ok {
return fmt.Errorf("unsupported `resource_attributes` %q value %v in %s", k, v, subreceiverKey)
}
}

cfg.receiverTemplates[subreceiverKey] = subreceiver
}

Expand Down
26 changes: 26 additions & 0 deletions receiver/receivercreator/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,32 @@ func TestLoadConfig(t *testing.T) {
assert.Equal(t, []config.Type{"mock_observer"}, r1.WatchObservers)
}

func TestInvalidResourceAttributeEndpointType(t *testing.T) {
factories, err := componenttest.NopFactories()
require.Nil(t, err)

factories.Receivers[("nop")] = &nopWithEndpointFactory{ReceiverFactory: componenttest.NewNopReceiverFactory()}

factory := NewFactory()
factories.Receivers[typeStr] = factory
cfg, err := servicetest.LoadConfigAndValidate(filepath.Join("testdata", "invalid-resource-attributes.yaml"), factories)
require.EqualError(t, err, "error reading receivers configuration for \"receiver_creator\": resource attributes for unsupported endpoint type \"not.a.real.type\"")
require.Nil(t, cfg)
}

func TestInvalidReceiverResourceAttributeValueType(t *testing.T) {
factories, err := componenttest.NopFactories()
require.Nil(t, err)

factories.Receivers[("nop")] = &nopWithEndpointFactory{ReceiverFactory: componenttest.NewNopReceiverFactory()}

factory := NewFactory()
factories.Receivers[typeStr] = factory
cfg, err := servicetest.LoadConfigAndValidate(filepath.Join("testdata", "invalid-receiver-resource-attributes.yaml"), factories)
require.EqualError(t, err, "error reading receivers configuration for \"receiver_creator\": unsupported `resource_attributes` \"one\" value <nil> in examplereceiver/1")
require.Nil(t, cfg)
}

type nopWithEndpointConfig struct {
config.ReceiverSettings `mapstructure:",squash"`
Endpoint string `mapstructure:"endpoint"`
Expand Down
11 changes: 11 additions & 0 deletions receiver/receivercreator/observerhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,21 @@ func (obs *observerHandler) OnAdd(added []observer.Endpoint) {
continue
}

resAttrs := map[string]string{}
for k, v := range template.ResourceAttributes {
strVal, ok := v.(string)
if !ok {
obs.logger.Info(fmt.Sprintf("ignoring unsupported `resource_attributes` %q value %v", k, v))
continue
}
resAttrs[k] = strVal
}

// Adds default and/or configured resource attributes (e.g. k8s.pod.uid) to resources
// as telemetry is emitted.
resourceEnhancer, err := newResourceEnhancer(
obs.config.ResourceAttributes,
resAttrs,
env,
e,
obs.nextConsumer,
Expand Down
4 changes: 2 additions & 2 deletions receiver/receivercreator/observerhandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestOnAdd(t *testing.T) {
rcvrCfg := receiverConfig{id: config.NewComponentIDWithName("name", "1"), config: userConfigMap{"foo": "bar"}}
cfg := createDefaultConfig().(*Config)
cfg.receiverTemplates = map[string]receiverTemplate{
"name/1": {rcvrCfg, "", newRuleOrPanic(`type == "port"`)},
"name/1": {rcvrCfg, "", map[string]interface{}{}, newRuleOrPanic(`type == "port"`)},
}
handler := &observerHandler{
config: cfg,
Expand Down Expand Up @@ -104,7 +104,7 @@ func TestOnChange(t *testing.T) {
newRcvr := &nopWithEndpointReceiver{}
cfg := createDefaultConfig().(*Config)
cfg.receiverTemplates = map[string]receiverTemplate{
"name/1": {rcvrCfg, "", newRuleOrPanic(`type == "port"`)},
"name/1": {rcvrCfg, "", map[string]interface{}{}, newRuleOrPanic(`type == "port"`)},
}
handler := &observerHandler{
config: cfg,
Expand Down
28 changes: 18 additions & 10 deletions receiver/receivercreator/resourceenhancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,23 +36,31 @@ type resourceEnhancer struct {

func newResourceEnhancer(
resources resourceAttributes,
receiverAttributes map[string]string,
env observer.EndpointEnv,
endpoint observer.Endpoint,
nextConsumer consumer.Metrics,
) (*resourceEnhancer, error) {
attrs := map[string]string{}

// Precompute values that will be inserted for each resource object passed through.
for attr, expr := range resources[endpoint.Details.Type()] {
res, err := evalBackticksInConfigValue(expr, env)
if err != nil {
return nil, fmt.Errorf("failed processing resource attribute %q for endpoint %v: %w", attr, endpoint.ID, err)
}
for _, resource := range []map[string]string{resources[endpoint.Details.Type()], receiverAttributes} {
// Precompute values that will be inserted for each resource object passed through.
for attr, expr := range resource {
// If the attribute value is empty this signals to delete existing
if expr == "" {
delete(attrs, attr)
continue
}
Comment on lines +50 to +53
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have similar functionality where we use nil to delete attributes. Should we probably apply the same approach here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing implementation assumes empty strings aren't desirable for attribute values and these changes are in line with that precedent. I can see how this may be overly opinionated as is but think it would be a breaking change to adopt. If we want it I'd prefer to shift everything over to map[string]*string in a subsequent PR to keep the scope reduced and help with reducing unforeseen side effects.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok with this approach as long as we don't want users to have empty string values


res, err := evalBackticksInConfigValue(expr, env)
if err != nil {
return nil, fmt.Errorf("failed processing resource attribute %q for endpoint %v: %w", attr, endpoint.ID, err)
}

// If the attribute value is empty user has likely removed the default value so skip it.
val := fmt.Sprint(res)
if val != "" {
attrs[attr] = val
val := fmt.Sprint(res)
if val != "" {
attrs[attr] = val
}
}
}

Expand Down
44 changes: 39 additions & 5 deletions receiver/receivercreator/resourceenhancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,11 @@ func Test_newResourceEnhancer(t *testing.T) {

cfg := createDefaultConfig().(*Config)
type args struct {
resources resourceAttributes
env observer.EndpointEnv
endpoint observer.Endpoint
nextConsumer consumer.Metrics
resources resourceAttributes
resourceAttributes map[string]string
env observer.EndpointEnv
endpoint observer.Endpoint
nextConsumer consumer.Metrics
}
tests := []struct {
name string
Expand Down Expand Up @@ -131,6 +132,39 @@ func Test_newResourceEnhancer(t *testing.T) {
},
wantErr: false,
},
{
name: "both forms of resource attributes",
args: args{
resources: func() resourceAttributes {
res := map[observer.EndpointType]map[string]string{observer.PodType: {}}
for k, v := range cfg.ResourceAttributes[observer.PodType] {
res[observer.PodType][k] = v
}
res[observer.PodType]["duplicate.resource.attribute"] = "pod.value"
res[observer.PodType]["delete.me"] = "pod.value"
return res
}(),
resourceAttributes: map[string]string{
"expanded.resource.attribute": "`'labels' in pod ? pod.labels['region'] : labels['region']`",
"duplicate.resource.attribute": "receiver.value",
"delete.me": "",
},
env: podEnv,
endpoint: podEndpoint,
nextConsumer: nil,
},
want: &resourceEnhancer{
nextConsumer: nil,
attrs: map[string]string{
"k8s.namespace.name": "default",
"k8s.pod.name": "pod-1",
"k8s.pod.uid": "uid-1",
"duplicate.resource.attribute": "receiver.value",
"expanded.resource.attribute": "west-1",
},
},
wantErr: false,
},
{
name: "error",
args: args{
Expand All @@ -149,7 +183,7 @@ func Test_newResourceEnhancer(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := newResourceEnhancer(tt.args.resources, tt.args.env, tt.args.endpoint, tt.args.nextConsumer)
got, err := newResourceEnhancer(tt.args.resources, tt.args.resourceAttributes, tt.args.env, tt.args.endpoint, tt.args.nextConsumer)
if (err != nil) != tt.wantErr {
t.Errorf("newResourceEnhancer() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down
17 changes: 17 additions & 0 deletions receiver/receivercreator/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,27 @@ receivers:
receivers:
examplereceiver/1:
rule: type == "port"
config:
key: value
resource_attributes:
one: two
nop/1:
rule: type == "port"
config:
endpoint: localhost:12345
resource_attributes:
two: three
resource_attributes:
container:
container.key: container.value
pod:
pod.key: pod.value
port:
port.key: port.value
hostport:
hostport.key: hostport.value
k8s.node:
k8s.node.key: k8s.node.value

processors:
nop:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
receivers:
receiver_creator:
watch_observers: [mock_observer]
receivers:
examplereceiver/1:
rule: type == "port"
config:
key: value
resource_attributes:
one: null

processors:
nop:

exporters:
nop:

service:
pipelines:
metrics:
receivers: [receiver_creator]
processors: [nop]
exporters: [nop]
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
receivers:
receiver_creator:
watch_observers: [mock_observer]
receivers:
examplereceiver/1:
rule: type == "port"
config:
key: value
resource_attributes:
one: two
resource_attributes:
k8s.node:
k8s.node.key: k8s.node.value
not.a.real.type:
not: real

processors:
nop:

exporters:
nop:

service:
pipelines:
metrics:
receivers: [receiver_creator]
processors: [nop]
exporters: [nop]
4 changes: 4 additions & 0 deletions unreleased/receivercreatorresourceattributes.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
change_type: enhancement
component: receivercreator
note: add per-receiver `resource_attribute` and validate endpoint type keys on global
issues: [11766]