-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[processor/spanmetrics] Resource attributes support #7075
[processor/spanmetrics] Resource attributes support #7075
Conversation
- Use LRU cache for the "resource_attributes". - Add document/usage for the "resource_attributes" configuration.
7575eac
to
548efb5
Compare
548efb5
to
76713ee
Compare
defaultDimensionsCacheSize = 1000 | ||
serviceNameKey = conventions.AttributeServiceName | ||
instrumentationLibraryName = "spanmetricsprocessor" | ||
operationKey = "operation" // OpenTelemetry non-standard constant. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The attributes used in this processor aren't standard. Something we might address at some point. For example "status_code" should probably be "http.status_code" if im not mistaken
latencyCount map[metricKey]uint64 | ||
latencySum map[metricKey]float64 | ||
latencyBucketCounts map[metricKey][]uint64 | ||
latencyCount map[resourceKey]map[metricKey]uint64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a thought: I can't help but think this processor is slowly reinventing a lot of what the metrics SDK is already supposed to be doing. I wonder if we can simplify it by using metric instruments to record new values, and let the SDK do the rest? It would simplify this processor massively, I haven't looked into it enough to see if it would be possible though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is a valid point :)
Not sure on the limitations of this but it is worth exploring.
@@ -227,9 +243,17 @@ func (p *processorImp) Capabilities() consumer.Capabilities { | |||
// It aggregates the trace data to generate metrics, forwarding these metrics to the discovered metrics exporter. | |||
// The original input trace data will be forwarded to the next consumer, unmodified. | |||
func (p *processorImp) ConsumeTraces(ctx context.Context, traces pdata.Traces) error { | |||
p.lock.Lock() | |||
// use defer to pass the output to downstream components as quick as possible. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To my understanding, this would synchronize the entire processor, not allowing two calls of ConsumeTraces
to be called at once. I think it makes sense since there is a lot of internal state going on that shouldn't be messed with. It seems that same instances of processors aren't executed concurrently anyway, though someone with more expertise on the collector could verify that.
In this case I think deferring the unlocking is fine.
p.lock.Unlock() | ||
return nil, err | ||
} | ||
// If the service name doesn't exist, we treat it as invalid and do not generate a metric |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are changing this field to be mandatory. See the PR description:
Service Name has been moved to become a default resource attribute instead of the attribute as per the resource semantic convention.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be captured in some way, either via logging or an additional metric saying to allow for easy debugging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At this point in time, I do not believe this PR should be merged as it currently presented.
There is may too many behavioural changes for me to consider this safe to release on its own.
I would prefer this PR to broken down into smaller change sets so that iterating on behaviour changes doesn't block the accepted changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few things that I had noticed but I still would like this split
defaultDimensionsCacheSize = 1000 | ||
serviceNameKey = conventions.AttributeServiceName | ||
instrumentationLibraryName = "spanmetricsprocessor" | ||
operationKey = "operation" // OpenTelemetry non-standard constant. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would encourage you to use the semconv where possible.
// metricKey is used to carry the stringified metric attributes | ||
type metricKey string | ||
|
||
// resourceKey is used to carry the stringified resource attributes | ||
type resourceKey string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is no method receivers on these types, they are superfluous and provide no additional value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They're not superfluous, they help to make sense of the structures that use them to construct multi-dimensional maps. Rather than map[string]map[string]int
, there is map[resourceKey]map[metricKey]int
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
latencyCount map[metricKey]uint64 | ||
latencySum map[metricKey]float64 | ||
latencyBucketCounts map[metricKey][]uint64 | ||
latencyCount map[resourceKey]map[metricKey]uint64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is a valid point :)
Not sure on the limitations of this but it is worth exploring.
@@ -227,9 +243,17 @@ func (p *processorImp) Capabilities() consumer.Capabilities { | |||
// It aggregates the trace data to generate metrics, forwarding these metrics to the discovered metrics exporter. | |||
// The original input trace data will be forwarded to the next consumer, unmodified. | |||
func (p *processorImp) ConsumeTraces(ctx context.Context, traces pdata.Traces) error { | |||
p.lock.Lock() | |||
// use defer to pass the output to downstream components as quick as possible. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is rather expensive way to do this.
You're basically blocking in processing (even not intended) for a secondary call to ConsumeTraces with no timeout.
You would be better using something like atomic
or a channel as a semaphore to allow for fast exit instead of queue until holder is done.
p.lock.Unlock() | ||
return nil, err | ||
} | ||
// If the service name doesn't exist, we treat it as invalid and do not generate a metric |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be captured in some way, either via logging or an additional metric saying to allow for easy debugging.
for _, traces := range tc.traces { | ||
// Test | ||
traces := traces | ||
// create an excessive concurrent usage. The processor will not be used in this way practically. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am worried that since we are not joining and waiting on the validation thread of this test that we are walking into creating a flaky tests since result can happen after the "test" has completed.
} | ||
return true | ||
}) | ||
assert.Empty(t, wantResourceAttrServiceA, "Did not see all expected dimensions in metric. Missing: ", wantResourceAttrServiceA) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it not be faster to check if the attributes len matches the expected len?
switch k { | ||
case notInSpanResourceAttr1: | ||
assert.Fail(t, notInSpanResourceAttr1+" should not be in this metric") | ||
default: | ||
assert.Equal(t, wantResourceAttrServiceA[k], value) | ||
delete(wantResourceAttrServiceA, k) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use assert.Equal
here?
It could be like:
assert.NotEqual(t, k, notInSpanResource, "Must not be resource defined...")
assert.Equal(t, wantResourceAttrService[k], v.StringVal())
delete(wantResourceAttrServiceA, v.StringVal())
@@ -227,9 +243,17 @@ func (p *processorImp) Capabilities() consumer.Capabilities { | |||
// It aggregates the trace data to generate metrics, forwarding these metrics to the discovered metrics exporter. | |||
// The original input trace data will be forwarded to the next consumer, unmodified. | |||
func (p *processorImp) ConsumeTraces(ctx context.Context, traces pdata.Traces) error { | |||
p.lock.Lock() | |||
// use defer to pass the output to downstream components as quick as possible. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another approach that might be considered is to lean in to concurrency. Instead of emitting metrics for every invocation of ConsumeTraces()
, perhaps start a goroutine with a ticker that will periodically take the lock, build the metrics, emit them, and reset the exemplars. I think the whole of ConsumeTraces()
would probably still need to be under lock, but it would do less work with each invocation and metrics emission would be more regular.
@MovieStoreGuy I think there are two user-facing behaviour changes as we put in the PR's description:
I agree we should aim to create the small changeset as we can. But the feature change is a bit atomic and hard to break. If the tests are giving us enough confidence to cover the behaviour change, we should consider they are safe. Feel free to let me know what tests should be added. |
@Aneurysm9 @albertteoh Can I ask your opinion on the above comment from @MovieStoreGuy ? I replied before as:
I'm open to breaking it down(probably later) if we are leaning towards that. |
In general, I agree with @MovieStoreGuy in reducing the diff size, and I think there are some opportunities to do this. I also appreciate the difficulty of keeping these PR separate, particularly when introducing the new resource attr key to internal maps. Moreover, I think github handles stacked diffs quite nicely, so you can create PRs for various layers of functionality. In this PR's case, at least the
Is the former a pre-requisite for the latter? If so, perhaps there's another opportunity to break the PR up. |
I think I agree that there may be too many things happening here to have a high degree of confidence in the correctness of the changes. Is it possible to structure this as a series of PRs that each take independent steps in this direction? If not, because an intermediate state would be broken or something like that, I would really like to see it rebased into a series of commits that can be viewed as a sequence of refactorings and additions. |
Please see the discussion about removing the aggregation from spanmettricsprocessor: I think we should be on hold adding the new feature into this processor until the aggregation functionality is removed. |
Description:
Currently, there is a bug / no logic to differentiate between resource attributes. This PR adds the feature to allow users to optionally specify resource attributes to append similar to the existing dimensions mechanism.
Service Name has been moved to become a default resource attribute instead of the attribute as per the resource semantic convention.
Implementation details:
Some of the critical implementation changes:
The core data structures are changed from
map[metricKey]valType
tomap[resourceKey]map[metricKey]valType
. This ensures the accurate aggregation of data based on specified resource attributes and dimensions.This processor is stateful. Due to the nature of its logic, the concurrent executions of ConsumeTraces() will output incorrect data. This lock forces the ConsumeTraces() can only execute in serial. In the mean, the lock is not used in the internal functions of this processor. These internal functions are concurrent unsafe.
Potential todos:
We should consider these issues for the following improvements:
There is a bit of discrepancy between the use of terms "Dimensions" and "Attributes". It seems like "Attributes" is more commonly used so I have used that term in this PR for adding in resource attributes. I propose that we rename dimensions to attributes in the future, altho there will need to be some backwards compatibility added to the config to support this.
Should we still fall back to search attribute value from
resources
fordimensions
? see todo.Link to tracking Issue:
#6486
Testing:
Testing added to ensure new structure/hierarchy of metrics under instrumentationLibraryMetrics structure under Resource is generated correctly.
TestProcessorConsumeTracesConcurrentSafe
is added to make sure this public functionConsumeTraces()
does not cause race conditions(the tests are executed with-race
flags)Documentation:
Usage of the new config option resource_attributes added to README.md