Skip to content

Commit

Permalink
[exporter/signalfx] Use "unknown" env correlation value as fallback
Browse files Browse the repository at this point in the history
Use the same fallback value for the metrics/tracing environment correlation calls as being set by the backend on the traces. This fixed the APM/IM correlation in the Splunk Observability UI for the users that send traces with no "deployment.environment" resource attribute value set.
  • Loading branch information
dmitryax committed Feb 5, 2024
1 parent e2d601a commit 43d56e8
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 68 deletions.
29 changes: 29 additions & 0 deletions .chloggen/signalfx-exp-env-correlation-fix.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: exporter/signalfx

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Use "unknown" value for the environment correlation calls as fallback.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31052]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
This fixed the APM/IM correlation in the Splunk Observability UI for the users that send traces with no
"deployment.environment" resource attribute value set.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
60 changes: 6 additions & 54 deletions exporter/signalfxexporter/internal/apm/tracetracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter/internal/apm/log"
)

// fallbackEnvironment is the environment value to use if no environment is found in the span.
// This is the same value that is being set on the backend on spans that don't have an environment.
const fallbackEnvironment = "unknown"

// DefaultDimsToSyncSource are the default dimensions to sync correlated environment and services onto.
var DefaultDimsToSyncSource = map[string]string{
"container_id": "container_id",
Expand Down Expand Up @@ -168,61 +172,9 @@ func (a *ActiveServiceTracker) processEnvironment(span Span, now time.Time) {
}
environment, environmentFound := span.Environment()

// If spans are coming in with no environment, we use the same fallback value that is being set on the backend.
if !environmentFound || strings.TrimSpace(environment) == "" {
// The following is ONLY to mitigate a corner case scenario where the environment for a container/pod is set on
// the backend with an old default environment set by the agent, and the agent has been restarted with no
// default environment. On restart, the agent only fetches existing environment values for hostIDDims, and does
// not fetch for containers/pod dims. If a container/pod is emitting spans without an environment value, then
// the agent won't be able to overwrite the value. The agent is also unable to age out environment values for
// containers/pods from startup.
//
// Under that VERY specific circumstance, we need to fetch and delete the environment values for each
// pod/container that we have not already scraped an environment off of this agent runtime.
for sourceAttr, dimName := range a.dimsToSyncSource {
sourceAttr := sourceAttr
dimName := dimName
if dimValue, ok := span.Tag(sourceAttr); ok {
// look up the dimension / value in the environment cache to ensure it doesn't already exist
// if it does exist, this means we've already scraped and overwritten what was on the backend
// probably from another span. This also implies that some spans for the tenant have an environment
// and some do not.
a.tenantEnvironmentCache.RunIfKeyDoesNotExist(&CacheKey{dimName: dimName, dimValue: dimValue}, func() {
// create a cache key ensuring that we don't fetch environment values multiple times for spans with
// empty environments
if isNew := a.tenantEmptyEnvironmentCache.UpdateOrCreate(&CacheKey{dimName: dimName, dimValue: dimValue}, now); isNew {
// get the existing value from the backend
a.correlationClient.Get(dimName, dimValue, func(response map[string][]string) {
if len(response) == 0 {
return
}

// look for the existing environment value
environments, ok := response["sf_environments"]
if !ok || len(environments) == 0 {
return
}

// Note: This cache operation is OK to execute inside of the encapsulating
// tenantEnvironmentCache.RunIfKeyDoesNotExist() because it is actually inside an
// asynchronous callback to the correlation client's Get(). So... by the time the callback
// is actually executed, the parent RunIfKeyDoesNotExist will have already released the lock
// on the cache
a.tenantEnvironmentCache.RunIfKeyDoesNotExist(&CacheKey{dimName: dimName, dimValue: dimValue}, func() {
a.correlationClient.Delete(&correlations.Correlation{
Type: correlations.Environment,
DimName: dimName,
DimValue: dimValue,
Value: environments[0], // We already checked for empty, and backend enforces 1 value max.
}, func(_ *correlations.Correlation) {})
})
})
}
})
}
}

// return so we don't set empty string or spaces as an environment value
return
environment = fallbackEnvironment
}

// update the environment for the hostIDDims
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,26 +148,20 @@ func TestCorrelationEmptyEnvironment(t *testing.T) {
a := New(log.Nil, 5*time.Minute, correlationClient, hostIDDims, true, DefaultDimsToSyncSource)
wg.Wait() // wait for the initial fetch of hostIDDims to complete

// for each container level ID we're going to perform a GET to check for an environment
wg.Add(len(containerLevelIDDims))
a.AddSpansGeneric(context.Background(), fakeSpanList{
{tags: mergeStringMaps(hostIDDims, containerLevelIDDims)},
{tags: mergeStringMaps(hostIDDims, containerLevelIDDims)},
{tags: mergeStringMaps(hostIDDims, containerLevelIDDims)},
})

wg.Wait() // wait for the gets to complete to check for existing tenant environment values

// there shouldn't be any active tenant environments. None of the spans had environments on them,
// and we don't actively fetch and store environments from the back end. That's kind of the whole point of this
// the workaround this is testing.
assert.Equal(t, int64(0), a.tenantEnvironmentCache.ActiveCount, "tenantEnvironmentCache is not properly tracked")
// ensure we only have 1 entry per container / pod id
assert.Equal(t, int64(len(containerLevelIDDims)), a.tenantEmptyEnvironmentCache.ActiveCount, "tenantEmptyEnvironmentCount is not properly tracked")
// len(hostIDDims) * len(containerLevelIDDims)
assert.Equal(t, int64(len(containerLevelIDDims)+len(hostIDDims)), atomic.LoadInt64(&correlationClient.getCounter), "")
// 1 DELETE * len(containerLevelIDDims)
assert.Equal(t, len(containerLevelIDDims), len(correlationClient.getCorrelations()), "")
cors := correlationClient.getCorrelations()
assert.Equal(t, 4, len(cors), "expected 4 correlations to be made")
for _, c := range cors {
assert.Contains(t, []string{"container_id", "kubernetes_pod_uid", "host", "AWSUniqueId"}, c.DimName)
assert.Contains(t, []string{"test", "randomAWSUniqueId", "testk8sPodUID", "testContainerID"}, c.DimValue)
assert.Equal(t, correlations.Type("environment"), c.Type)
assert.Equal(t, fallbackEnvironment, c.Value)
}
}

func TestCorrelationUpdates(t *testing.T) {
Expand Down

0 comments on commit 43d56e8

Please sign in to comment.