Skip to content

Commit

Permalink
fix(canary): Fix Flaky Tests (#889)
Browse files Browse the repository at this point in the history
* make the missing entry lookup synchronous during tests to hopefully avoid races around the entry order and test verifications

* playing around with timings some more to get all tests to pass even on a really slow machine
  • Loading branch information
slim-bean authored Aug 13, 2019
1 parent a407099 commit 173e8ea
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 14 deletions.
2 changes: 1 addition & 1 deletion cmd/loki-canary/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func main() {

w := writer.NewWriter(os.Stdout, sentChan, *interval, *size)
r := reader.NewReader(os.Stderr, receivedChan, *tls, *addr, *user, *pass, *lName, *lVal)
c := comparator.NewComparator(os.Stderr, *wait, *pruneInterval, *buckets, sentChan, receivedChan, r)
c := comparator.NewComparator(os.Stderr, *wait, *pruneInterval, *buckets, sentChan, receivedChan, r, true)

http.Handle("/metrics", promhttp.Handler())
go func() {
Expand Down
13 changes: 11 additions & 2 deletions pkg/canary/comparator/comparator.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type Comparator struct {
ackdEntries []*time.Time
maxWait time.Duration
pruneInterval time.Duration
confirmAsync bool
sent chan time.Time
recv chan time.Time
rdr reader.LokiReader
Expand All @@ -69,12 +70,13 @@ type Comparator struct {
}

func NewComparator(writer io.Writer, maxWait time.Duration, pruneInterval time.Duration,
buckets int, sentChan chan time.Time, receivedChan chan time.Time, reader reader.LokiReader) *Comparator {
buckets int, sentChan chan time.Time, receivedChan chan time.Time, reader reader.LokiReader, confirmAsync bool) *Comparator {
c := &Comparator{
w: writer,
entries: []*time.Time{},
maxWait: maxWait,
pruneInterval: pruneInterval,
confirmAsync: confirmAsync,
sent: sentChan,
recv: receivedChan,
rdr: reader,
Expand Down Expand Up @@ -160,6 +162,8 @@ func (c *Comparator) entryReceived(ts time.Time) {
}

func (c *Comparator) Size() int {
c.entMtx.Lock()
defer c.entMtx.Unlock()
return len(c.entries)
}

Expand Down Expand Up @@ -209,7 +213,12 @@ func (c *Comparator) pruneEntries() {
}
c.entries = c.entries[:k]
if len(missing) > 0 {
go c.confirmMissing(missing)
if c.confirmAsync {
go c.confirmMissing(missing)
} else {
c.confirmMissing(missing)
}

}

// Prune the acknowledged list, remove anything older than our maxwait
Expand Down
27 changes: 16 additions & 11 deletions pkg/canary/comparator/comparator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestComparatorEntryReceivedOutOfOrder(t *testing.T) {
duplicateEntries = &mockCounter{}

actual := &bytes.Buffer{}
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1, make(chan time.Time), make(chan time.Time), nil)
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1, make(chan time.Time), make(chan time.Time), nil, false)

t1 := time.Now()
t2 := t1.Add(1 * time.Second)
Expand Down Expand Up @@ -60,7 +60,7 @@ func TestComparatorEntryReceivedNotExpected(t *testing.T) {
duplicateEntries = &mockCounter{}

actual := &bytes.Buffer{}
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1, make(chan time.Time), make(chan time.Time), nil)
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1, make(chan time.Time), make(chan time.Time), nil, false)

t1 := time.Now()
t2 := t1.Add(1 * time.Second)
Expand Down Expand Up @@ -101,7 +101,7 @@ func TestComparatorEntryReceivedDuplicate(t *testing.T) {
duplicateEntries = &mockCounter{}

actual := &bytes.Buffer{}
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1, make(chan time.Time), make(chan time.Time), nil)
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1, make(chan time.Time), make(chan time.Time), nil, false)

t1 := time.Now()
t2 := t1.Add(1 * time.Second)
Expand Down Expand Up @@ -157,7 +157,8 @@ func TestEntryNeverReceived(t *testing.T) {

mr := &mockReader{found}
maxWait := 50 * time.Millisecond
c := NewComparator(actual, maxWait, 2*time.Millisecond, 1, make(chan time.Time), make(chan time.Time), mr)
//We set the prune interval timer to a huge value here so that it never runs, instead we call pruneEntries manually below
c := NewComparator(actual, maxWait, 50*time.Hour, 1, make(chan time.Time), make(chan time.Time), mr, false)

c.entrySent(t1)
c.entrySent(t2)
Expand All @@ -173,15 +174,17 @@ func TestEntryNeverReceived(t *testing.T) {

assert.Equal(t, 2, c.Size())

//Wait a few maxWait intervals just to make sure all entries are expired and the async confirmMissing has completed
//Wait a few maxWait intervals just to make sure all entries are expired
<-time.After(2 * maxWait)

expected := fmt.Sprintf(ErrOutOfOrderEntry+ErrOutOfOrderEntry+ErrEntryNotReceivedWs+ErrEntryNotReceived+ErrEntryNotReceivedWs,
c.pruneEntries()

expected := fmt.Sprintf(ErrOutOfOrderEntry+ErrOutOfOrderEntry+ErrEntryNotReceivedWs+ErrEntryNotReceivedWs+ErrEntryNotReceived,
t3, []time.Time{t2},
t5, []time.Time{t2, t4},
t2.UnixNano(), maxWait.Seconds(),
t2.UnixNano(), maxWait.Seconds(),
t4.UnixNano(), maxWait.Seconds())
t4.UnixNano(), maxWait.Seconds(),
t2.UnixNano(), maxWait.Seconds())

assert.Equal(t, expected, actual.String())
assert.Equal(t, 0, c.Size())
Expand All @@ -202,12 +205,13 @@ func TestEntryNeverReceived(t *testing.T) {
func TestPruneAckdEntires(t *testing.T) {
actual := &bytes.Buffer{}
maxWait := 30 * time.Millisecond
c := NewComparator(actual, maxWait, 10*time.Millisecond, 1, make(chan time.Time), make(chan time.Time), nil)
//We set the prune interval timer to a huge value here so that it never runs, instead we call pruneEntries manually below
c := NewComparator(actual, maxWait, 50*time.Hour, 1, make(chan time.Time), make(chan time.Time), nil, false)

t1 := time.Now()
t2 := t1.Add(1 * time.Millisecond)
t3 := t2.Add(1 * time.Millisecond)
t4 := t3.Add(100 * time.Millisecond)
t4 := t3.Add(100 * time.Second)

assert.Equal(t, 0, len(c.ackdEntries))

Expand All @@ -228,8 +232,9 @@ func TestPruneAckdEntires(t *testing.T) {
assert.Equal(t, 4, len(c.ackdEntries))

// Wait a couple maxWaits to make sure the first 3 timestamps get pruned from the ackdEntries,
// the fourth should still remain because it was 100ms newer than t3
// the fourth should still remain because its much much newer and we only prune things older than maxWait
<-time.After(2 * maxWait)
c.pruneEntries()

assert.Equal(t, 1, len(c.ackdEntries))
assert.Equal(t, t4, *c.ackdEntries[0])
Expand Down

0 comments on commit 173e8ea

Please sign in to comment.