Skip to content
This repository has been archived by the owner on Dec 16, 2022. It is now read-only.

Commit

Permalink
vrepl: yet another vstreamer deflake
Browse files Browse the repository at this point in the history
The latest stream from "current" position feature introduced
a hang in the vstreamer, if the subsequent DMLs started getting
played before the vstreamer would start pulling from binlogs.

The fix is to wait till we get a heartbeat before proceeding
with the tests.

Signed-off-by: Sugu Sougoumarane <ssougou@gmail.com>
  • Loading branch information
sougou committed Jan 2, 2020
1 parent d0bca3c commit 6614afe
Showing 1 changed file with 12 additions and 2 deletions.
14 changes: 12 additions & 2 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1083,11 +1083,17 @@ func TestHeartbeat(t *testing.T) {
assert.Equal(t, binlogdatapb.VEventType_HEARTBEAT, evs[0].Type)
}

func runCases(t *testing.T, filter *binlogdatapb.Filter, testcases []testcase, postion string) {
func runCases(t *testing.T, filter *binlogdatapb.Filter, testcases []testcase, position string) {
t.Helper()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ch := startStream(ctx, t, filter, postion)
ch := startStream(ctx, t, filter, position)

// If position is 'current', we wait for a heartbeat to be
// sure the vstreamer has started.
if position == "current" {
<-ch
}

for _, tcase := range testcases {
switch input := tcase.input.(type) {
Expand All @@ -1108,6 +1114,8 @@ func runCases(t *testing.T, filter *binlogdatapb.Filter, testcases []testcase, p

func expectLog(ctx context.Context, t *testing.T, input interface{}, ch <-chan []*binlogdatapb.VEvent, output [][]string) {
t.Helper()
timer := time.NewTimer(1 * time.Minute)
defer timer.Stop()
for _, wantset := range output {
var evs []*binlogdatapb.VEvent
for {
Expand All @@ -1125,6 +1133,8 @@ func expectLog(ctx context.Context, t *testing.T, input interface{}, ch <-chan [
}
case <-ctx.Done():
t.Fatal("stream ended early")
case <-timer.C:
t.Fatalf("timed out waiting for events: %v", wantset)
}
if len(evs) != 0 {
break
Expand Down

0 comments on commit 6614afe

Please sign in to comment.