diff --git a/pkg/canary/comparator/comparator.go b/pkg/canary/comparator/comparator.go index e48d52484799..97c0b8cc2db9 100644 --- a/pkg/canary/comparator/comparator.go +++ b/pkg/canary/comparator/comparator.go @@ -431,15 +431,28 @@ func (c *Comparator) pruneEntries(currentTime time.Time) { func (c *Comparator) confirmMissing(currentTime time.Time) { // Because we are querying loki timestamps vs the timestamp in the log, // make the range +/- 10 seconds to allow for clock inaccuracies + c.missingMtx.Lock() + if len(c.missingEntries) == 0 { + c.missingMtx.Unlock() + return + } start := *c.missingEntries[0] start = start.Add(-10 * time.Second) end := *c.missingEntries[len(c.missingEntries)-1] end = end.Add(10 * time.Second) + c.missingMtx.Unlock() + recvd, err := c.rdr.Query(start, end) if err != nil { fmt.Fprintf(c.w, "error querying loki: %s\n", err) return } + // Now that query has returned, take out the lock on the missingEntries list so we can modify it + // It's possible more entries were added to this list but that's ok, if they match something in the + // query result we will remove them, if they don't they won't be old enough yet to remove. + c.missingMtx.Lock() + defer c.missingMtx.Unlock() + // This is to help debug some missing log entries when queried, // let's print exactly what we are missing and what Loki sent back for _, r := range c.missingEntries { @@ -449,11 +462,6 @@ func (c *Comparator) confirmMissing(currentTime time.Time) { fmt.Fprintf(c.w, DebugQueryResult, r.UnixNano()) } - // Now that query has returned, take out the lock on the missingEntries list so we can modify it - // It's possible more entries were added to this list but that's ok, if they match something in the - // query result we will remove them, if they don't they won't be old enough yet to remove. - c.missingMtx.Lock() - defer c.missingMtx.Unlock() k := 0 for i, m := range c.missingEntries { found := false diff --git a/pkg/canary/comparator/comparator_test.go b/pkg/canary/comparator/comparator_test.go index 8f11f3eb16b7..7dcb73c2a1c4 100644 --- a/pkg/canary/comparator/comparator_test.go +++ b/pkg/canary/comparator/comparator_test.go @@ -218,6 +218,45 @@ func TestEntryNeverReceived(t *testing.T) { } +// Ensure that if confirmMissing calls pile up and run concurrently, this doesn't cause a panic +func TestConcurrentConfirmMissing(t *testing.T) { + found := []time.Time{ + time.Unix(0, 0), + time.UnixMilli(1), + time.UnixMilli(2), + } + mr := &mockReader{resp: found} + + output := &bytes.Buffer{} + + wait := 30 * time.Millisecond + maxWait := 30 * time.Millisecond + + c := NewComparator(output, wait, maxWait, 50*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), mr, false) + + for _, t := range found { + c.missingEntries = append(c.missingEntries, &t) + } + + wg := sync.WaitGroup{} + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + assert.NotPanics(t, func() { c.confirmMissing(time.UnixMilli(3)) }) + }() + } + assert.Eventually( + t, + func() bool { + wg.Wait() + return true + }, + time.Second, + 10*time.Millisecond, + ) +} + func TestPruneAckdEntires(t *testing.T) { actual := &bytes.Buffer{} wait := 30 * time.Millisecond