diff --git a/server/application/logs.go b/server/application/logs.go index 778f04edec66e..b52eef81e2e77 100644 --- a/server/application/logs.go +++ b/server/application/logs.go @@ -120,16 +120,22 @@ func mergeLogStreams(streams []chan logEntry, bufferingDuration time.Duration) c var sentAt time.Time ticker := time.NewTicker(bufferingDuration) + done := make(chan struct{}) go func() { - for range ticker.C { - sentAtLock.Lock() - // waited long enough for logs from each streams, send everything accumulated - if sentAt.Add(bufferingDuration).Before(time.Now()) { - _ = send(true) - sentAt = time.Now() - } + for { + select { + case <-done: + return + case <-ticker.C: + sentAtLock.Lock() + // waited long enough for logs from each streams, send everything accumulated + if sentAt.Add(bufferingDuration).Before(time.Now()) { + _ = send(true) + sentAt = time.Now() + } - sentAtLock.Unlock() + sentAtLock.Unlock() + } } }() @@ -145,6 +151,11 @@ func mergeLogStreams(streams []chan logEntry, bufferingDuration time.Duration) c _ = send(true) ticker.Stop() + // ticker.Stop() does not close the channel, and it does not wait for the channel to be drained. So we need to + // explicitly prevent the gorountine from leaking by closing the channel. We also need to prevent the goroutine + // from calling `send` again, because `send` pushes to the `merged` channel which we're about to close. + // This describes the approach nicely: https://stackoverflow.com/questions/17797754/ticker-stop-behaviour-in-golang + done <- struct{}{} close(merged) }() return merged diff --git a/server/application/logs_test.go b/server/application/logs_test.go index 76bd5df134ae9..7a565e37efa79 100644 --- a/server/application/logs_test.go +++ b/server/application/logs_test.go @@ -75,3 +75,33 @@ func TestMergeLogStreams(t *testing.T) { assert.Equal(t, []string{"1", "2", "3", "4"}, lines) } + +func TestMergeLogStreams_RaceCondition(t *testing.T) { + // Test for regression of this issue: https://github.com/argoproj/argo-cd/issues/7006 + for i := 0; i < 5000; i++ { + first := make(chan logEntry) + second := make(chan logEntry) + + go func() { + parseLogsStream("first", io.NopCloser(strings.NewReader(`2021-02-09T00:00:01Z 1`)), first) + time.Sleep(time.Duration(i%3) * time.Millisecond) + close(first) + }() + + go func() { + parseLogsStream("second", io.NopCloser(strings.NewReader(`2021-02-09T00:00:02Z 2`)), second) + time.Sleep(time.Duration((i+1)%3) * time.Millisecond) + close(second) + }() + + merged := mergeLogStreams([]chan logEntry{first, second}, 1*time.Millisecond) + + // Drain the channel + for range merged { + } + + // This test intentionally doesn't test the order of the output. Under these intense conditions, the test would + // fail often due to out of order entries. This test is only meant to reproduce a race between a channel writer + // and channel closer. + } +}