Skip to content

Commit

Permalink
[receiver/discovery] Update entity events ID fields
Browse files Browse the repository at this point in the history
Update entity events ID fields according to the latest contract with the inventory service:
- `discovery.endpoint.id` is moved from identifying to regular attributes
- The following attributes now marked as identifying instead:
  - `k8s.pod.uid`
  - `k8s.node.uid`
  - `container.id`
  - `source.port`
  - `host.id`
  • Loading branch information
dmitryax committed Apr 28, 2024
1 parent 460148c commit 80ff86c
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 106 deletions.
8 changes: 1 addition & 7 deletions internal/confmapprovider/discovery/discoverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,14 +708,8 @@ func (d *discoverer) ConsumeLogs(_ context.Context, ld plog.Logs) error {
}
}

entityIDAttr, ok := lr.Attributes().Get(discovery.OtelEntityIDAttr)
if !ok {
d.logger.Debug("invalid entity event without id", zap.Any("log record", lr))
continue
}

endpointID := "unavailable"
if eid, k := entityIDAttr.Map().Get(discovery.EndpointIDAttr); k {
if eid, k := entityAttrs.Get(discovery.EndpointIDAttr); k {
endpointID = eid.AsString()
}

Expand Down
100 changes: 81 additions & 19 deletions internal/receiver/discoveryreceiver/endpoint_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,25 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
semconv "go.opentelemetry.io/collector/semconv/v1.22.0"
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/signalfx/splunk-otel-collector/internal/common/discovery"
)

var (
_ observer.Notify = (*notify)(nil)
)
const sourcePortAttr = "source.port"

// identifyingAttrKeys are the keys of attributes that are used to identify an entity.
var identifyingAttrKeys = []string{
semconv.AttributeK8SPodUID,
semconv.AttributeContainerID,
semconv.AttributeK8SNodeUID,
semconv.AttributeHostID,
sourcePortAttr,
}

var _ observer.Notify = (*notify)(nil)

type endpointTracker struct {
correlations correlationStore
Expand Down Expand Up @@ -118,7 +128,7 @@ func (et *endpointTracker) emitEntityStateEvents(observerCID component.ID, endpo
if et.pLogs != nil {
entityEvents, numFailed, err := entityStateEvents(observerCID, endpoints, et.correlations, time.Now())
if err != nil {
et.logger.Warn(fmt.Sprintf("failed converting %v endpoints to log records", numFailed), zap.Error(err))
et.logger.Warn(fmt.Sprintf("failed converting %v endpoints to entity state events", numFailed), zap.Error(err))
}
if entityEvents.Len() > 0 {
et.pLogs <- entityEvents.ConvertAndMoveToLogs()
Expand All @@ -128,7 +138,10 @@ func (et *endpointTracker) emitEntityStateEvents(observerCID component.ID, endpo

func (et *endpointTracker) emitEntityDeleteEvents(endpoints []observer.Endpoint) {
if et.pLogs != nil {
entityEvents := entityDeleteEvents(endpoints, time.Now())
entityEvents, numFailed, err := entityDeleteEvents(endpoints, time.Now())
if err != nil {
et.logger.Warn(fmt.Sprintf("failed converting %v endpoints to entity delete events", numFailed), zap.Error(err))
}
if entityEvents.Len() > 0 {
et.pLogs <- entityEvents.ConvertAndMoveToLogs()
}
Expand Down Expand Up @@ -214,40 +227,56 @@ func (n *notify) OnChange(changed []observer.Endpoint) {
func entityStateEvents(observerID component.ID, endpoints []observer.Endpoint, correlations correlationStore, ts time.Time) (ees experimentalmetricmetadata.EntityEventsSlice, failed int, err error) {
entityEvents := experimentalmetricmetadata.NewEntityEventsSlice()
for _, endpoint := range endpoints {
if endpoint.Details == nil {
failed++
err = multierr.Combine(err, fmt.Errorf("endpoint %q has no details", endpoint.ID))
continue
}

entityEvent := entityEvents.AppendEmpty()
entityEvent.SetTimestamp(pcommon.NewTimestampFromTime(ts))
entityEvent.ID().PutStr(discovery.EndpointIDAttr, string(endpoint.ID))
entityState := entityEvent.SetEntityState()
attrs := entityState.Attributes()
if endpoint.Details != nil {
if envAttrs, e := endpointEnvToAttrs(endpoint.Details.Type(), endpoint.Details.Env()); e != nil {
err = multierr.Combine(err, fmt.Errorf("failed determining attributes for %q: %w", endpoint.ID, e))
failed++
} else {
// this must be the first mutation of attrs since it's destructive
envAttrs.CopyTo(attrs)
}
attrs.PutStr("type", string(endpoint.Details.Type()))
if envAttrs, e := endpointEnvToAttrs(endpoint.Details.Type(), endpoint.Details.Env()); e != nil {
err = multierr.Combine(err, fmt.Errorf("failed determining attributes for %q: %w", endpoint.ID, e))
failed++
} else {
// this must be the first mutation of attrs since it's destructive
envAttrs.CopyTo(attrs)
}
attrs.PutStr("type", string(endpoint.Details.Type()))
attrs.PutStr(discovery.EndpointIDAttr, string(endpoint.ID))
attrs.PutStr("endpoint", endpoint.Target)
attrs.PutStr(observerNameAttr, observerID.Name())
attrs.PutStr(observerTypeAttr, observerID.Type().String())
for k, v := range correlations.Attrs(endpoint.ID) {
attrs.PutStr(k, v)
}
extractIdentifyingAttrs(attrs, entityEvent.ID())
}
return entityEvents, failed, err
}

func entityDeleteEvents(endpoints []observer.Endpoint, ts time.Time) experimentalmetricmetadata.EntityEventsSlice {
func entityDeleteEvents(endpoints []observer.Endpoint, ts time.Time) (ees experimentalmetricmetadata.EntityEventsSlice, failed int, err error) {
entityEvents := experimentalmetricmetadata.NewEntityEventsSlice()
for _, endpoint := range endpoints {
if endpoint.Details == nil {
failed++
err = multierr.Combine(err, fmt.Errorf("endpoint %q has no details", endpoint.ID))
continue
}

entityEvent := entityEvents.AppendEmpty()
entityEvent.SetTimestamp(pcommon.NewTimestampFromTime(ts))
entityEvent.ID().PutStr(discovery.EndpointIDAttr, string(endpoint.ID))
entityEvent.SetEntityDelete()
if envAttrs, e := endpointEnvToAttrs(endpoint.Details.Type(), endpoint.Details.Env()); e != nil {
err = multierr.Combine(err, fmt.Errorf("failed determining attributes for %q: %w", endpoint.ID, e))
failed++
} else {
extractIdentifyingAttrs(envAttrs, entityEvent.ID())
}
}
return entityEvents
return entityEvents, failed, err
}

func endpointEnvToAttrs(endpointType observer.EndpointType, endpointEnv observer.EndpointEnv) (pcommon.Map, error) {
Expand All @@ -273,10 +302,34 @@ func endpointEnvToAttrs(endpointType observer.EndpointType, endpointEnv observer
if e != nil {
return attrs, fmt.Errorf("failed parsing %v pod attributes ", endpointType)
}
podAttrs.CopyTo(attrs.PutEmptyMap(k))
podAttrs.Range(func(k string, v pcommon.Value) bool {
v.CopyTo(attrs.PutEmpty(k))
return true
})
} else {
return attrs, fmt.Errorf("failed parsing %v pod env %#v", endpointType, v)
}

// rename keys according to the OTel Semantic Conventions
case k == "container_id":
attrs.PutEmpty(semconv.AttributeContainerID).FromRaw(v)
case k == "port":
attrs.PutEmpty(sourcePortAttr).FromRaw(v)
case endpointType == observer.PodType:
if k == "namespace" {
attrs.PutEmpty(semconv.AttributeK8SNamespaceName).FromRaw(v)
} else {
attrs.PutEmpty("k8s.pod." + k).FromRaw(v)
}
case endpointType == observer.K8sNodeType:
switch k {
case "name":
attrs.PutEmpty(semconv.AttributeK8SNodeName).FromRaw(v)
case "uid":
attrs.PutEmpty(semconv.AttributeK8SNodeUID).FromRaw(v)
default:
attrs.PutEmpty(k).FromRaw(v)
}
default:
switch vVal := v.(type) {
case uint16:
Expand All @@ -298,3 +351,12 @@ func shouldEmbedMap(endpointType observer.EndpointType, k string) bool {
endpointType == observer.ContainerType ||
endpointType == observer.K8sNodeType))
}

func extractIdentifyingAttrs(from pcommon.Map, to pcommon.Map) {
for _, k := range identifyingAttrKeys {
if v, ok := from.Get(k); ok {
v.CopyTo(to.PutEmpty(k))
from.Remove(k)
}
}
}
Loading

0 comments on commit 80ff86c

Please sign in to comment.