Skip to content

Commit 4471603

Browse files
authored
fix(api): send to closed channel in mergeLogStreams (#7006) (#21178)
* fix(api): send to closed channel in mergeLogStreams (#7006) Signed-off-by: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com> * more intense test Signed-off-by: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com> * even more intense Signed-off-by: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com> * remove unnecessary comment Signed-off-by: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com> * fix the race condition Signed-off-by: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com> --------- Signed-off-by: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com>
1 parent 99efafb commit 4471603

File tree

2 files changed

+49
-8
lines changed

2 files changed

+49
-8
lines changed

server/application/logs.go

+19-8
Original file line numberDiff line numberDiff line change
@@ -120,16 +120,22 @@ func mergeLogStreams(streams []chan logEntry, bufferingDuration time.Duration) c
120120
var sentAt time.Time
121121

122122
ticker := time.NewTicker(bufferingDuration)
123+
done := make(chan struct{})
123124
go func() {
124-
for range ticker.C {
125-
sentAtLock.Lock()
126-
// waited long enough for logs from each streams, send everything accumulated
127-
if sentAt.Add(bufferingDuration).Before(time.Now()) {
128-
_ = send(true)
129-
sentAt = time.Now()
130-
}
125+
for {
126+
select {
127+
case <-done:
128+
return
129+
case <-ticker.C:
130+
sentAtLock.Lock()
131+
// waited long enough for logs from each streams, send everything accumulated
132+
if sentAt.Add(bufferingDuration).Before(time.Now()) {
133+
_ = send(true)
134+
sentAt = time.Now()
135+
}
131136

132-
sentAtLock.Unlock()
137+
sentAtLock.Unlock()
138+
}
133139
}
134140
}()
135141

@@ -145,6 +151,11 @@ func mergeLogStreams(streams []chan logEntry, bufferingDuration time.Duration) c
145151
_ = send(true)
146152

147153
ticker.Stop()
154+
// ticker.Stop() does not close the channel, and it does not wait for the channel to be drained. So we need to
155+
// explicitly prevent the gorountine from leaking by closing the channel. We also need to prevent the goroutine
156+
// from calling `send` again, because `send` pushes to the `merged` channel which we're about to close.
157+
// This describes the approach nicely: https://stackoverflow.com/questions/17797754/ticker-stop-behaviour-in-golang
158+
done <- struct{}{}
148159
close(merged)
149160
}()
150161
return merged

server/application/logs_test.go

+30
Original file line numberDiff line numberDiff line change
@@ -75,3 +75,33 @@ func TestMergeLogStreams(t *testing.T) {
7575

7676
assert.Equal(t, []string{"1", "2", "3", "4"}, lines)
7777
}
78+
79+
func TestMergeLogStreams_RaceCondition(t *testing.T) {
80+
// Test for regression of this issue: https://github.com/argoproj/argo-cd/issues/7006
81+
for i := 0; i < 5000; i++ {
82+
first := make(chan logEntry)
83+
second := make(chan logEntry)
84+
85+
go func() {
86+
parseLogsStream("first", io.NopCloser(strings.NewReader(`2021-02-09T00:00:01Z 1`)), first)
87+
time.Sleep(time.Duration(i%3) * time.Millisecond)
88+
close(first)
89+
}()
90+
91+
go func() {
92+
parseLogsStream("second", io.NopCloser(strings.NewReader(`2021-02-09T00:00:02Z 2`)), second)
93+
time.Sleep(time.Duration((i+1)%3) * time.Millisecond)
94+
close(second)
95+
}()
96+
97+
merged := mergeLogStreams([]chan logEntry{first, second}, 1*time.Millisecond)
98+
99+
// Drain the channel
100+
for range merged {
101+
}
102+
103+
// This test intentionally doesn't test the order of the output. Under these intense conditions, the test would
104+
// fail often due to out of order entries. This test is only meant to reproduce a race between a channel writer
105+
// and channel closer.
106+
}
107+
}

0 commit comments

Comments
 (0)