diff --git a/CHANGELOG.md b/CHANGELOG.md index 181d2dcde8d6b..8198d3e1905ff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,10 +1,10 @@ ## Main - +* [5568](https://github.com/grafana/loki/pull/5568) **afayngelerindbx**: Fix canary panics due to concurrent execution of `confirmMissing` * [5552](https://github.com/grafana/loki/pull/5552) **jiachengxu**: Loki mixin: add `DiskSpaceUtilizationPanel` * [5541](https://github.com/grafana/loki/pull/5541) **bboreham**: Queries: reject very deeply nested regexps which could crash Loki. * [5536](https://github.com/grafana/loki/pull/5536) **jiachengxu**: Loki mixin: make labelsSelector in loki chunks dashboards configurable * [5535](https://github.com/grafana/loki/pull/5535) **jiachengxu**: Loki mixins: use labels selector for loki chunks dashboard -* [5507](https://github.com/grafana/loki/pull/5507) **MichelHollands**: Remove extra param in call for inflightRequests metric. +* [5507](https://github.com/grafana/loki/pull/5507) **MichelHollands**: Remove extra param in call for inflightRequests metric. * [5356](https://github.com/grafana/loki/pull/5356) **jbschami**: Enhance lambda-promtail to support adding extra labels from an environment variable value * [5409](https://github.com/grafana/loki/pull/5409) **ldb**: Enable best effort parsing for Syslog messages * [5392](https://github.com/grafana/loki/pull/5392) **MichelHollands**: Etcd credentials are parsed as secrets instead of plain text now. diff --git a/pkg/canary/comparator/comparator.go b/pkg/canary/comparator/comparator.go index e48d52484799c..97c0b8cc2db9e 100644 --- a/pkg/canary/comparator/comparator.go +++ b/pkg/canary/comparator/comparator.go @@ -431,15 +431,28 @@ func (c *Comparator) pruneEntries(currentTime time.Time) { func (c *Comparator) confirmMissing(currentTime time.Time) { // Because we are querying loki timestamps vs the timestamp in the log, // make the range +/- 10 seconds to allow for clock inaccuracies + c.missingMtx.Lock() + if len(c.missingEntries) == 0 { + c.missingMtx.Unlock() + return + } start := *c.missingEntries[0] start = start.Add(-10 * time.Second) end := *c.missingEntries[len(c.missingEntries)-1] end = end.Add(10 * time.Second) + c.missingMtx.Unlock() + recvd, err := c.rdr.Query(start, end) if err != nil { fmt.Fprintf(c.w, "error querying loki: %s\n", err) return } + // Now that query has returned, take out the lock on the missingEntries list so we can modify it + // It's possible more entries were added to this list but that's ok, if they match something in the + // query result we will remove them, if they don't they won't be old enough yet to remove. + c.missingMtx.Lock() + defer c.missingMtx.Unlock() + // This is to help debug some missing log entries when queried, // let's print exactly what we are missing and what Loki sent back for _, r := range c.missingEntries { @@ -449,11 +462,6 @@ func (c *Comparator) confirmMissing(currentTime time.Time) { fmt.Fprintf(c.w, DebugQueryResult, r.UnixNano()) } - // Now that query has returned, take out the lock on the missingEntries list so we can modify it - // It's possible more entries were added to this list but that's ok, if they match something in the - // query result we will remove them, if they don't they won't be old enough yet to remove. - c.missingMtx.Lock() - defer c.missingMtx.Unlock() k := 0 for i, m := range c.missingEntries { found := false diff --git a/pkg/canary/comparator/comparator_test.go b/pkg/canary/comparator/comparator_test.go index 8f11f3eb16b78..db2c1b341bd6b 100644 --- a/pkg/canary/comparator/comparator_test.go +++ b/pkg/canary/comparator/comparator_test.go @@ -218,6 +218,46 @@ func TestEntryNeverReceived(t *testing.T) { } +// Ensure that if confirmMissing calls pile up and run concurrently, this doesn't cause a panic +func TestConcurrentConfirmMissing(t *testing.T) { + found := []time.Time{ + time.Unix(0, 0), + time.UnixMilli(1), + time.UnixMilli(2), + } + mr := &mockReader{resp: found} + + output := &bytes.Buffer{} + + wait := 30 * time.Millisecond + maxWait := 30 * time.Millisecond + + c := NewComparator(output, wait, maxWait, 50*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), mr, false) + + for _, t := range found { + tCopy := t + c.missingEntries = append(c.missingEntries, &tCopy) + } + + wg := sync.WaitGroup{} + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + assert.NotPanics(t, func() { c.confirmMissing(time.UnixMilli(3)) }) + }() + } + assert.Eventually( + t, + func() bool { + wg.Wait() + return true + }, + time.Second, + 10*time.Millisecond, + ) +} + func TestPruneAckdEntires(t *testing.T) { actual := &bytes.Buffer{} wait := 30 * time.Millisecond