Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receiver/discovery] Update entity events ID fields #4739

Merged
merged 1 commit into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions internal/confmapprovider/discovery/discoverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,14 +708,8 @@ func (d *discoverer) ConsumeLogs(_ context.Context, ld plog.Logs) error {
}
}

entityIDAttr, ok := lr.Attributes().Get(discovery.OtelEntityIDAttr)
if !ok {
d.logger.Debug("invalid entity event without id", zap.Any("log record", lr))
continue
}

endpointID := "unavailable"
if eid, k := entityIDAttr.Map().Get(discovery.EndpointIDAttr); k {
if eid, k := entityAttrs.Get(discovery.EndpointIDAttr); k {
endpointID = eid.AsString()
}

Expand Down
100 changes: 81 additions & 19 deletions internal/receiver/discoveryreceiver/endpoint_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,25 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
semconv "go.opentelemetry.io/collector/semconv/v1.22.0"
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/signalfx/splunk-otel-collector/internal/common/discovery"
)

var (
_ observer.Notify = (*notify)(nil)
)
const sourcePortAttr = "source.port"

// identifyingAttrKeys are the keys of attributes that are used to identify an entity.
var identifyingAttrKeys = []string{
semconv.AttributeK8SPodUID,
semconv.AttributeContainerID,
semconv.AttributeK8SNodeUID,
semconv.AttributeHostID,
sourcePortAttr,
}

var _ observer.Notify = (*notify)(nil)

type endpointTracker struct {
correlations correlationStore
Expand Down Expand Up @@ -118,7 +128,7 @@ func (et *endpointTracker) emitEntityStateEvents(observerCID component.ID, endpo
if et.pLogs != nil {
entityEvents, numFailed, err := entityStateEvents(observerCID, endpoints, et.correlations, time.Now())
if err != nil {
et.logger.Warn(fmt.Sprintf("failed converting %v endpoints to log records", numFailed), zap.Error(err))
et.logger.Warn(fmt.Sprintf("failed converting %v endpoints to entity state events", numFailed), zap.Error(err))
}
if entityEvents.Len() > 0 {
et.pLogs <- entityEvents.ConvertAndMoveToLogs()
Expand All @@ -128,7 +138,10 @@ func (et *endpointTracker) emitEntityStateEvents(observerCID component.ID, endpo

func (et *endpointTracker) emitEntityDeleteEvents(endpoints []observer.Endpoint) {
if et.pLogs != nil {
entityEvents := entityDeleteEvents(endpoints, time.Now())
entityEvents, numFailed, err := entityDeleteEvents(endpoints, time.Now())
if err != nil {
et.logger.Warn(fmt.Sprintf("failed converting %v endpoints to entity delete events", numFailed), zap.Error(err))
}
if entityEvents.Len() > 0 {
et.pLogs <- entityEvents.ConvertAndMoveToLogs()
}
Expand Down Expand Up @@ -214,40 +227,56 @@ func (n *notify) OnChange(changed []observer.Endpoint) {
func entityStateEvents(observerID component.ID, endpoints []observer.Endpoint, correlations correlationStore, ts time.Time) (ees experimentalmetricmetadata.EntityEventsSlice, failed int, err error) {
entityEvents := experimentalmetricmetadata.NewEntityEventsSlice()
for _, endpoint := range endpoints {
if endpoint.Details == nil {
failed++
err = multierr.Combine(err, fmt.Errorf("endpoint %q has no details", endpoint.ID))
continue
}

entityEvent := entityEvents.AppendEmpty()
entityEvent.SetTimestamp(pcommon.NewTimestampFromTime(ts))
entityEvent.ID().PutStr(discovery.EndpointIDAttr, string(endpoint.ID))
entityState := entityEvent.SetEntityState()
attrs := entityState.Attributes()
if endpoint.Details != nil {
if envAttrs, e := endpointEnvToAttrs(endpoint.Details.Type(), endpoint.Details.Env()); e != nil {
err = multierr.Combine(err, fmt.Errorf("failed determining attributes for %q: %w", endpoint.ID, e))
failed++
} else {
// this must be the first mutation of attrs since it's destructive
envAttrs.CopyTo(attrs)
}
attrs.PutStr("type", string(endpoint.Details.Type()))
if envAttrs, e := endpointEnvToAttrs(endpoint.Details.Type(), endpoint.Details.Env()); e != nil {
err = multierr.Combine(err, fmt.Errorf("failed determining attributes for %q: %w", endpoint.ID, e))
failed++
} else {
// this must be the first mutation of attrs since it's destructive
envAttrs.CopyTo(attrs)
}
attrs.PutStr("type", string(endpoint.Details.Type()))
attrs.PutStr(discovery.EndpointIDAttr, string(endpoint.ID))
attrs.PutStr("endpoint", endpoint.Target)
attrs.PutStr(observerNameAttr, observerID.Name())
attrs.PutStr(observerTypeAttr, observerID.Type().String())
for k, v := range correlations.Attrs(endpoint.ID) {
attrs.PutStr(k, v)
}
extractIdentifyingAttrs(attrs, entityEvent.ID())
}
return entityEvents, failed, err
}

func entityDeleteEvents(endpoints []observer.Endpoint, ts time.Time) experimentalmetricmetadata.EntityEventsSlice {
func entityDeleteEvents(endpoints []observer.Endpoint, ts time.Time) (ees experimentalmetricmetadata.EntityEventsSlice, failed int, err error) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need the failed (int) value? Is it a return code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just for logging. We do the same for emitEntityStateEvents, so I kept it the same, but I agree it's not ideal. We can probably move the logging inside the funcs. I can address it separately

entityEvents := experimentalmetricmetadata.NewEntityEventsSlice()
for _, endpoint := range endpoints {
if endpoint.Details == nil {
failed++
err = multierr.Combine(err, fmt.Errorf("endpoint %q has no details", endpoint.ID))
continue
}

entityEvent := entityEvents.AppendEmpty()
entityEvent.SetTimestamp(pcommon.NewTimestampFromTime(ts))
entityEvent.ID().PutStr(discovery.EndpointIDAttr, string(endpoint.ID))
entityEvent.SetEntityDelete()
if envAttrs, e := endpointEnvToAttrs(endpoint.Details.Type(), endpoint.Details.Env()); e != nil {
err = multierr.Combine(err, fmt.Errorf("failed determining attributes for %q: %w", endpoint.ID, e))
failed++
} else {
extractIdentifyingAttrs(envAttrs, entityEvent.ID())
}
}
return entityEvents
return entityEvents, failed, err
}

func endpointEnvToAttrs(endpointType observer.EndpointType, endpointEnv observer.EndpointEnv) (pcommon.Map, error) {
Expand All @@ -273,10 +302,34 @@ func endpointEnvToAttrs(endpointType observer.EndpointType, endpointEnv observer
if e != nil {
return attrs, fmt.Errorf("failed parsing %v pod attributes ", endpointType)
}
podAttrs.CopyTo(attrs.PutEmptyMap(k))
podAttrs.Range(func(k string, v pcommon.Value) bool {
v.CopyTo(attrs.PutEmpty(k))
return true
})
} else {
return attrs, fmt.Errorf("failed parsing %v pod env %#v", endpointType, v)
}

// rename keys according to the OTel Semantic Conventions
case k == "container_id":
attrs.PutEmpty(semconv.AttributeContainerID).FromRaw(v)
case k == "port":
attrs.PutEmpty(sourcePortAttr).FromRaw(v)
case endpointType == observer.PodType:
if k == "namespace" {
attrs.PutEmpty(semconv.AttributeK8SNamespaceName).FromRaw(v)
} else {
attrs.PutEmpty("k8s.pod." + k).FromRaw(v)
}
case endpointType == observer.K8sNodeType:
switch k {
case "name":
attrs.PutEmpty(semconv.AttributeK8SNodeName).FromRaw(v)
case "uid":
attrs.PutEmpty(semconv.AttributeK8SNodeUID).FromRaw(v)
default:
attrs.PutEmpty(k).FromRaw(v)
}
default:
switch vVal := v.(type) {
case uint16:
Expand All @@ -298,3 +351,12 @@ func shouldEmbedMap(endpointType observer.EndpointType, k string) bool {
endpointType == observer.ContainerType ||
endpointType == observer.K8sNodeType))
}

func extractIdentifyingAttrs(from pcommon.Map, to pcommon.Map) {
for _, k := range identifyingAttrKeys {
if v, ok := from.Get(k); ok {
v.CopyTo(to.PutEmpty(k))
from.Remove(k)
}
}
}
Loading
Loading