From a937ac370b37b660543c4eac01b9b45b2cfe90e7 Mon Sep 17 00:00:00 2001 From: Paulo Janotti Date: Tue, 7 Nov 2023 21:47:27 -0800 Subject: [PATCH] [chore][processort/tailsamplingprocessor] Limit concurrency for certain tests (flay test on Windows runners) (#29014) **Description:** Limit number of goroutines started during `processor/tailsamplingprocessor` tests. This causes very frequently failures on the Windows tests, see [here](https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/28682#issuecomment-1797088954) for example. The issue is that the race detector has a hard limit on number of goroutines, see https://github.com/golang/go/issues/23611. The fix limits the concurrency in two tests so this limit is not hit on GH Windows runners. **Link to tracking Issue:** Fix #9126 **Testing:** Increased the concurrency on the two changed tests caused the error and validated that it passed twice on my fork. **Documentation:** N/A --- .../internal/idbatcher/id_batcher_test.go | 6 ++++++ processor/tailsamplingprocessor/processor_test.go | 8 ++++++++ 2 files changed, 14 insertions(+) diff --git a/processor/tailsamplingprocessor/internal/idbatcher/id_batcher_test.go b/processor/tailsamplingprocessor/internal/idbatcher/id_batcher_test.go index bb0cda216fad..f9b221a2afd9 100644 --- a/processor/tailsamplingprocessor/internal/idbatcher/id_batcher_test.go +++ b/processor/tailsamplingprocessor/internal/idbatcher/id_batcher_test.go @@ -103,11 +103,17 @@ func concurrencyTest(t *testing.T, numBatches, newBatchesInitialCapacity, batchC ids := generateSequentialIds(10000) wg := &sync.WaitGroup{} + // Limit the concurrency here to avoid creating too many goroutines and hit + // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/9126 + concurrencyLimiter := make(chan struct{}, 128) + defer close(concurrencyLimiter) for i := 0; i < len(ids); i++ { wg.Add(1) + concurrencyLimiter <- struct{}{} go func(id pcommon.TraceID) { batcher.AddToCurrentBatch(id) wg.Done() + <-concurrencyLimiter }(ids[i]) } diff --git a/processor/tailsamplingprocessor/processor_test.go b/processor/tailsamplingprocessor/processor_test.go index f5da628dc503..94d574c90009 100644 --- a/processor/tailsamplingprocessor/processor_test.go +++ b/processor/tailsamplingprocessor/processor_test.go @@ -208,16 +208,24 @@ func TestConcurrentTraceArrival(t *testing.T) { require.NoError(t, tsp.Shutdown(context.Background())) }() + // Limit the concurrency here to avoid creating too many goroutines and hit + // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/9126 + concurrencyLimiter := make(chan struct{}, 128) + defer close(concurrencyLimiter) for _, batch := range batches { // Add the same traceId twice. wg.Add(2) + concurrencyLimiter <- struct{}{} go func(td ptrace.Traces) { require.NoError(t, tsp.ConsumeTraces(context.Background(), td)) wg.Done() + <-concurrencyLimiter }(batch) + concurrencyLimiter <- struct{}{} go func(td ptrace.Traces) { require.NoError(t, tsp.ConsumeTraces(context.Background(), td)) wg.Done() + <-concurrencyLimiter }(batch) }