Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

canary: Adds locking to prevent multiple concurrent invocations of confirmMissing from clobbering each other #5568

Merged
merged 3 commits into from
Mar 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
## Main

* [5568](https://github.com/grafana/loki/pull/5568) **afayngelerindbx**: Fix canary panics due to concurrent execution of `confirmMissing`
* [5552](https://github.com/grafana/loki/pull/5552) **jiachengxu**: Loki mixin: add `DiskSpaceUtilizationPanel`
* [5541](https://github.com/grafana/loki/pull/5541) **bboreham**: Queries: reject very deeply nested regexps which could crash Loki.
* [5536](https://github.com/grafana/loki/pull/5536) **jiachengxu**: Loki mixin: make labelsSelector in loki chunks dashboards configurable
* [5535](https://github.com/grafana/loki/pull/5535) **jiachengxu**: Loki mixins: use labels selector for loki chunks dashboard
* [5507](https://github.com/grafana/loki/pull/5507) **MichelHollands**: Remove extra param in call for inflightRequests metric.
* [5507](https://github.com/grafana/loki/pull/5507) **MichelHollands**: Remove extra param in call for inflightRequests metric.
* [5356](https://github.com/grafana/loki/pull/5356) **jbschami**: Enhance lambda-promtail to support adding extra labels from an environment variable value
* [5409](https://github.com/grafana/loki/pull/5409) **ldb**: Enable best effort parsing for Syslog messages
* [5392](https://github.com/grafana/loki/pull/5392) **MichelHollands**: Etcd credentials are parsed as secrets instead of plain text now.
Expand Down
18 changes: 13 additions & 5 deletions pkg/canary/comparator/comparator.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,15 +431,28 @@ func (c *Comparator) pruneEntries(currentTime time.Time) {
func (c *Comparator) confirmMissing(currentTime time.Time) {
// Because we are querying loki timestamps vs the timestamp in the log,
// make the range +/- 10 seconds to allow for clock inaccuracies
c.missingMtx.Lock()
if len(c.missingEntries) == 0 {
c.missingMtx.Unlock()
return
}
start := *c.missingEntries[0]
start = start.Add(-10 * time.Second)
end := *c.missingEntries[len(c.missingEntries)-1]
end = end.Add(10 * time.Second)
c.missingMtx.Unlock()

recvd, err := c.rdr.Query(start, end)
if err != nil {
fmt.Fprintf(c.w, "error querying loki: %s\n", err)
return
}
// Now that query has returned, take out the lock on the missingEntries list so we can modify it
// It's possible more entries were added to this list but that's ok, if they match something in the
// query result we will remove them, if they don't they won't be old enough yet to remove.
c.missingMtx.Lock()
defer c.missingMtx.Unlock()

// This is to help debug some missing log entries when queried,
// let's print exactly what we are missing and what Loki sent back
for _, r := range c.missingEntries {
Expand All @@ -449,11 +462,6 @@ func (c *Comparator) confirmMissing(currentTime time.Time) {
fmt.Fprintf(c.w, DebugQueryResult, r.UnixNano())
}

// Now that query has returned, take out the lock on the missingEntries list so we can modify it
// It's possible more entries were added to this list but that's ok, if they match something in the
// query result we will remove them, if they don't they won't be old enough yet to remove.
c.missingMtx.Lock()
defer c.missingMtx.Unlock()
k := 0
for i, m := range c.missingEntries {
found := false
Expand Down
40 changes: 40 additions & 0 deletions pkg/canary/comparator/comparator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,46 @@ func TestEntryNeverReceived(t *testing.T) {

}

// Ensure that if confirmMissing calls pile up and run concurrently, this doesn't cause a panic
func TestConcurrentConfirmMissing(t *testing.T) {
found := []time.Time{
time.Unix(0, 0),
time.UnixMilli(1),
time.UnixMilli(2),
}
mr := &mockReader{resp: found}

output := &bytes.Buffer{}

wait := 30 * time.Millisecond
maxWait := 30 * time.Millisecond

c := NewComparator(output, wait, maxWait, 50*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), mr, false)

for _, t := range found {
tCopy := t
c.missingEntries = append(c.missingEntries, &tCopy)
}

wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to add sync.WaitGroup to makesure all goroutines are completed by end of the test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good suggestion. adding

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding :)

  1. can we make wg.Done() as defer wg.Done() at the beginning before assert.NotPanic is getting called? Because that way we can make sure wg.Done() is run even if that code panics.
  2. Not familiar with assert.Eventually. But shouldn't just simple wg.Wait() after the for loop is sufficient here? Not sure if I miss anything there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe assert.NotPanic recovers from the panic and returns false. Adding the defer to make the code easier to reason about.

For 2, I would recommend keeping Eventually there. The expectation is that this test should return fairly quickly(within 1s) keeping a wg.Done() at the end will make the test suite time out(default 10m) if one of those goroutines hits a deadlock.

defer wg.Done()
assert.NotPanics(t, func() { c.confirmMissing(time.UnixMilli(3)) })
}()
}
assert.Eventually(
t,
func() bool {
wg.Wait()
return true
},
time.Second,
10*time.Millisecond,
)
}

func TestPruneAckdEntires(t *testing.T) {
actual := &bytes.Buffer{}
wait := 30 * time.Millisecond
Expand Down