Skip to content
This repository has been archived by the owner on Dec 16, 2022. It is now read-only.

Commit

Permalink
Merge pull request vitessio#5642 from planetscale/ss-deflake-vstreamer3
Browse files Browse the repository at this point in the history
vrepl: yet another vstreamer deflake
  • Loading branch information
morgo authored Jan 2, 2020
2 parents d0bca3c + 6614afe commit e25368f
Showing 1 changed file with 12 additions and 2 deletions.
14 changes: 12 additions & 2 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1083,11 +1083,17 @@ func TestHeartbeat(t *testing.T) {
assert.Equal(t, binlogdatapb.VEventType_HEARTBEAT, evs[0].Type)
}

func runCases(t *testing.T, filter *binlogdatapb.Filter, testcases []testcase, postion string) {
func runCases(t *testing.T, filter *binlogdatapb.Filter, testcases []testcase, position string) {
t.Helper()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ch := startStream(ctx, t, filter, postion)
ch := startStream(ctx, t, filter, position)

// If position is 'current', we wait for a heartbeat to be
// sure the vstreamer has started.
if position == "current" {
<-ch
}

for _, tcase := range testcases {
switch input := tcase.input.(type) {
Expand All @@ -1108,6 +1114,8 @@ func runCases(t *testing.T, filter *binlogdatapb.Filter, testcases []testcase, p

func expectLog(ctx context.Context, t *testing.T, input interface{}, ch <-chan []*binlogdatapb.VEvent, output [][]string) {
t.Helper()
timer := time.NewTimer(1 * time.Minute)
defer timer.Stop()
for _, wantset := range output {
var evs []*binlogdatapb.VEvent
for {
Expand All @@ -1125,6 +1133,8 @@ func expectLog(ctx context.Context, t *testing.T, input interface{}, ch <-chan [
}
case <-ctx.Done():
t.Fatal("stream ended early")
case <-timer.C:
t.Fatalf("timed out waiting for events: %v", wantset)
}
if len(evs) != 0 {
break
Expand Down

0 comments on commit e25368f

Please sign in to comment.