Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Replace mutex with channel for TelemetryCounter #1981

Open
wants to merge 2 commits into
base: release/v1.65.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 49 additions & 24 deletions counter/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,33 +20,58 @@ import "encoding/json"
// TelemetryCounter tracks the number of times a set of resource and attribute dimensions have been seen.
type TelemetryCounter struct {
resources map[string]*ResourceCounter
commands chan func()
done chan struct{}
}

// NewTelemetryCounter creates a new TelemetryCounter.
func NewTelemetryCounter() *TelemetryCounter {
return &TelemetryCounter{
t := &TelemetryCounter{
resources: make(map[string]*ResourceCounter),
commands: make(chan func()),
done: make(chan struct{}),
}
go t.run()
return t
}

// run listens for commands to modify or read the resources.
func (t *TelemetryCounter) run() {
for {
select {
case cmd := <-t.commands:
cmd() // Execute the command
case <-t.done:
// Shutdown signal received, exit the loop
return
}
}
}

// Stop gracefully shuts down the TelemetryCounter.
func (t *TelemetryCounter) Stop() {
close(t.done)
}

// Add increments the counter with the supplied dimensions.
func (t *TelemetryCounter) Add(resource, attributes map[string]any) {
key := getDimensionKey(resource)
if _, ok := t.resources[key]; !ok {
t.resources[key] = NewResourceCounter(resource)
t.commands <- func() {
key := getDimensionKey(resource)
if _, ok := t.resources[key]; !ok {
t.resources[key] = newResourceCounter(resource)
}
t.resources[key].add(attributes)
}

t.resources[key].Add(attributes)
}

// Resources returns a map of resource ID to a counter for that resource.
func (t TelemetryCounter) Resources() map[string]*ResourceCounter {
return t.resources
}

// Reset resets the counter.
func (t *TelemetryCounter) Reset() {
t.resources = make(map[string]*ResourceCounter)
// Resources returns a map of resource ID to a counter for that resource and resets the counter.
func (t *TelemetryCounter) Resources() map[string]*ResourceCounter {
result := make(chan map[string]*ResourceCounter)
t.commands <- func() {
result <- t.resources
t.resources = make(map[string]*ResourceCounter) // Reset the counter
}
return <-result
}

// ResourceCounter dimensions the counter by resource.
Expand All @@ -55,22 +80,22 @@ type ResourceCounter struct {
attributes map[string]*AttributeCounter
}

// NewResourceCounter creates a new ResourceCounter.
func NewResourceCounter(values map[string]any) *ResourceCounter {
// newResourceCounter creates a new ResourceCounter.
func newResourceCounter(values map[string]any) *ResourceCounter {
return &ResourceCounter{
values: values,
attributes: map[string]*AttributeCounter{},
}
}

// Add increments the counter with the supplied dimensions.
func (r *ResourceCounter) Add(attributes map[string]any) {
// add increments the counter with the supplied dimensions.
func (r *ResourceCounter) add(attributes map[string]any) {
key := getDimensionKey(attributes)
if _, ok := r.attributes[key]; !ok {
r.attributes[key] = NewAttributeCounter(attributes)
r.attributes[key] = newAttributeCounter(attributes)
}

r.attributes[key].Add()
r.attributes[key].add()
}

// Attributes returns a map of attribute set ID to a counter for that attribute set.
Expand All @@ -89,15 +114,15 @@ type AttributeCounter struct {
count int
}

// NewAttributeCounter creates a new AttributeCounter.
func NewAttributeCounter(values map[string]any) *AttributeCounter {
// newAttributeCounter creates a new AttributeCounter.
func newAttributeCounter(values map[string]any) *AttributeCounter {
return &AttributeCounter{
values: values,
}
}

// Add increments the counter.
func (a *AttributeCounter) Add() {
// add increments the counter.
func (a *AttributeCounter) add() {
a.count++
}

Expand Down
13 changes: 8 additions & 5 deletions counter/counter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,13 @@ func TestLogCounter(t *testing.T) {
attrKey1 := getDimensionKey(attrMap1)
attrKey2 := getDimensionKey(attrMap2)

require.Equal(t, 10, counter.resources[resourceKey1].attributes[attrKey1].count)
require.Equal(t, 5, counter.resources[resourceKey1].attributes[attrKey2].count)
require.Equal(t, 1, counter.resources[resourceKey2].attributes[attrKey1].count)
resources := counter.Resources()

counter.Reset()
require.Len(t, counter.resources, 0)
require.Equal(t, 10, resources[resourceKey1].attributes[attrKey1].count)
require.Equal(t, 5, resources[resourceKey1].attributes[attrKey2].count)
require.Equal(t, 1, resources[resourceKey2].attributes[attrKey1].count)

// Ensure that the counter has been reset
resources = counter.Resources()
require.Len(t, resources, 0)
}
10 changes: 1 addition & 9 deletions processor/datapointcountprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ type metricCountProcessor struct {
logger *zap.Logger
cancel context.CancelFunc
wg sync.WaitGroup
mux sync.Mutex
}

// newExprProcessor returns a new processor with expr expressions.
Expand Down Expand Up @@ -105,13 +104,12 @@ func (p *metricCountProcessor) Shutdown(_ context.Context) error {
p.cancel()
}
p.wg.Wait()
p.counter.Stop()
return nil
}

// ConsumeMetrics processes the metrics.
func (p *metricCountProcessor) ConsumeMetrics(ctx context.Context, m pmetric.Metrics) error {
p.mux.Lock()
defer p.mux.Unlock()

if p.isOTTL() {
p.consumeMetricsOTTL(ctx, m)
Expand Down Expand Up @@ -202,9 +200,6 @@ func (p *metricCountProcessor) sendMetrics(ctx context.Context) {

// createMetrics creates metrics from the counter. The counter is reset after the metrics are created.
func (p *metricCountProcessor) createMetrics() pmetric.Metrics {
p.mux.Lock()
defer p.mux.Unlock()

metrics := pmetric.NewMetrics()
for _, resource := range p.counter.Resources() {
resourceMetrics := metrics.ResourceMetrics().AppendEmpty()
Expand All @@ -231,9 +226,6 @@ func (p *metricCountProcessor) createMetrics() pmetric.Metrics {

}
}

p.counter.Reset()

return metrics
}

Expand Down
8 changes: 1 addition & 7 deletions processor/logcountprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ type logCountProcessor struct {
logger *zap.Logger
cancel context.CancelFunc
wg sync.WaitGroup
mux sync.Mutex
}

// newProcessor returns a new processor.
Expand Down Expand Up @@ -100,13 +99,12 @@ func (p *logCountProcessor) Shutdown(_ context.Context) error {
p.cancel()
}
p.wg.Wait()
p.counter.Stop()
return nil
}

// ConsumeLogs processes the logs.
func (p *logCountProcessor) ConsumeLogs(ctx context.Context, pl plog.Logs) error {
p.mux.Lock()
defer p.mux.Unlock()

if p.isOTTL() {
p.consumeLogsOTTL(ctx, pl)
Expand Down Expand Up @@ -176,16 +174,12 @@ func (p *logCountProcessor) handleMetricInterval(ctx context.Context) {

// sendMetrics sends metrics to the consumer.
func (p *logCountProcessor) sendMetrics(ctx context.Context) {
p.mux.Lock()
defer p.mux.Unlock()

metrics := p.createMetrics()
if metrics.ResourceMetrics().Len() == 0 {
return
}

p.counter.Reset()

if err := routereceiver.RouteMetrics(ctx, p.config.Route, metrics); err != nil {
p.logger.Error("Failed to send metrics", zap.Error(err))
}
Expand Down
8 changes: 1 addition & 7 deletions processor/spancountprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ type spanCountProcessor struct {
logger *zap.Logger
cancel context.CancelFunc
wg sync.WaitGroup
mux sync.Mutex
}

// newProcessor returns a new processor.
Expand Down Expand Up @@ -101,13 +100,12 @@ func (p *spanCountProcessor) Shutdown(_ context.Context) error {
p.cancel()
}
p.wg.Wait()
p.counter.Stop()
return nil
}

// ConsumeMetrics processes the metrics.
func (p *spanCountProcessor) ConsumeTraces(ctx context.Context, t ptrace.Traces) error {
p.mux.Lock()
defer p.mux.Unlock()

if p.isOTTL() {
p.consumeTracesOTTL(ctx, t)
Expand Down Expand Up @@ -184,16 +182,12 @@ func (p *spanCountProcessor) handleMetricInterval(ctx context.Context) {

// sendMetrics sends metrics to the consumer.
func (p *spanCountProcessor) sendMetrics(ctx context.Context) {
p.mux.Lock()
defer p.mux.Unlock()

metrics := p.createMetrics()
if metrics.ResourceMetrics().Len() == 0 {
return
}

p.counter.Reset()

if err := routereceiver.RouteMetrics(ctx, p.config.Route, metrics); err != nil {
p.logger.Error("Failed to send metrics", zap.Error(err))
}
Expand Down
Loading