Skip to content

Commit

Permalink
adds locking to prevent multiple concurrent invocations of
Browse files Browse the repository at this point in the history
`confirmMissing` from clobbering each other
  • Loading branch information
afayngelerindbx committed Mar 10, 2022
1 parent 813373b commit 9c183f5
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 5 deletions.
18 changes: 13 additions & 5 deletions pkg/canary/comparator/comparator.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,15 +431,28 @@ func (c *Comparator) pruneEntries(currentTime time.Time) {
func (c *Comparator) confirmMissing(currentTime time.Time) {
// Because we are querying loki timestamps vs the timestamp in the log,
// make the range +/- 10 seconds to allow for clock inaccuracies
c.missingMtx.Lock()
if len(c.missingEntries) == 0 {
c.missingMtx.Unlock()
return
}
start := *c.missingEntries[0]
start = start.Add(-10 * time.Second)
end := *c.missingEntries[len(c.missingEntries)-1]
end = end.Add(10 * time.Second)
c.missingMtx.Unlock()

recvd, err := c.rdr.Query(start, end)
if err != nil {
fmt.Fprintf(c.w, "error querying loki: %s\n", err)
return
}
// Now that query has returned, take out the lock on the missingEntries list so we can modify it
// It's possible more entries were added to this list but that's ok, if they match something in the
// query result we will remove them, if they don't they won't be old enough yet to remove.
c.missingMtx.Lock()
defer c.missingMtx.Unlock()

// This is to help debug some missing log entries when queried,
// let's print exactly what we are missing and what Loki sent back
for _, r := range c.missingEntries {
Expand All @@ -449,11 +462,6 @@ func (c *Comparator) confirmMissing(currentTime time.Time) {
fmt.Fprintf(c.w, DebugQueryResult, r.UnixNano())
}

// Now that query has returned, take out the lock on the missingEntries list so we can modify it
// It's possible more entries were added to this list but that's ok, if they match something in the
// query result we will remove them, if they don't they won't be old enough yet to remove.
c.missingMtx.Lock()
defer c.missingMtx.Unlock()
k := 0
for i, m := range c.missingEntries {
found := false
Expand Down
39 changes: 39 additions & 0 deletions pkg/canary/comparator/comparator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,45 @@ func TestEntryNeverReceived(t *testing.T) {

}

// Ensure that if confirmMissing calls pile up and run concurrently, this doesn't cause a panic
func TestConcurrentConfirmMissing(t *testing.T) {
found := []time.Time{
time.Unix(0, 0),
time.UnixMilli(1),
time.UnixMilli(2),
}
mr := &mockReader{resp: found}

output := &bytes.Buffer{}

wait := 30 * time.Millisecond
maxWait := 30 * time.Millisecond

c := NewComparator(output, wait, maxWait, 50*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), mr, false)

for _, t := range found {
c.missingEntries = append(c.missingEntries, &t)
}

wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
assert.NotPanics(t, func() { c.confirmMissing(time.UnixMilli(3)) })
}()
}
assert.Eventually(
t,
func() bool {
wg.Wait()
return true
},
time.Second,
10*time.Millisecond,
)
}

func TestPruneAckdEntires(t *testing.T) {
actual := &bytes.Buffer{}
wait := 30 * time.Millisecond
Expand Down

0 comments on commit 9c183f5

Please sign in to comment.