diff --git a/processor/tailsamplingprocessor/internal/idbatcher/id_batcher_test.go b/processor/tailsamplingprocessor/internal/idbatcher/id_batcher_test.go index bb0cda216fad..f9b221a2afd9 100644 --- a/processor/tailsamplingprocessor/internal/idbatcher/id_batcher_test.go +++ b/processor/tailsamplingprocessor/internal/idbatcher/id_batcher_test.go @@ -103,11 +103,17 @@ func concurrencyTest(t *testing.T, numBatches, newBatchesInitialCapacity, batchC ids := generateSequentialIds(10000) wg := &sync.WaitGroup{} + // Limit the concurrency here to avoid creating too many goroutines and hit + // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/9126 + concurrencyLimiter := make(chan struct{}, 128) + defer close(concurrencyLimiter) for i := 0; i < len(ids); i++ { wg.Add(1) + concurrencyLimiter <- struct{}{} go func(id pcommon.TraceID) { batcher.AddToCurrentBatch(id) wg.Done() + <-concurrencyLimiter }(ids[i]) } diff --git a/processor/tailsamplingprocessor/processor_test.go b/processor/tailsamplingprocessor/processor_test.go index f5da628dc503..94d574c90009 100644 --- a/processor/tailsamplingprocessor/processor_test.go +++ b/processor/tailsamplingprocessor/processor_test.go @@ -208,16 +208,24 @@ func TestConcurrentTraceArrival(t *testing.T) { require.NoError(t, tsp.Shutdown(context.Background())) }() + // Limit the concurrency here to avoid creating too many goroutines and hit + // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/9126 + concurrencyLimiter := make(chan struct{}, 128) + defer close(concurrencyLimiter) for _, batch := range batches { // Add the same traceId twice. wg.Add(2) + concurrencyLimiter <- struct{}{} go func(td ptrace.Traces) { require.NoError(t, tsp.ConsumeTraces(context.Background(), td)) wg.Done() + <-concurrencyLimiter }(batch) + concurrencyLimiter <- struct{}{} go func(td ptrace.Traces) { require.NoError(t, tsp.ConsumeTraces(context.Background(), td)) wg.Done() + <-concurrencyLimiter }(batch) }