From 6614afe3586b489975ecf03b4e6f6893a4a14710 Mon Sep 17 00:00:00 2001 From: Sugu Sougoumarane Date: Wed, 1 Jan 2020 20:04:57 -0800 Subject: [PATCH] vrepl: yet another vstreamer deflake The latest stream from "current" position feature introduced a hang in the vstreamer, if the subsequent DMLs started getting played before the vstreamer would start pulling from binlogs. The fix is to wait till we get a heartbeat before proceeding with the tests. Signed-off-by: Sugu Sougoumarane --- .../tabletserver/vstreamer/vstreamer_test.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go index fa11342189d..1d5d382c66e 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go @@ -1083,11 +1083,17 @@ func TestHeartbeat(t *testing.T) { assert.Equal(t, binlogdatapb.VEventType_HEARTBEAT, evs[0].Type) } -func runCases(t *testing.T, filter *binlogdatapb.Filter, testcases []testcase, postion string) { +func runCases(t *testing.T, filter *binlogdatapb.Filter, testcases []testcase, position string) { t.Helper() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - ch := startStream(ctx, t, filter, postion) + ch := startStream(ctx, t, filter, position) + + // If position is 'current', we wait for a heartbeat to be + // sure the vstreamer has started. + if position == "current" { + <-ch + } for _, tcase := range testcases { switch input := tcase.input.(type) { @@ -1108,6 +1114,8 @@ func runCases(t *testing.T, filter *binlogdatapb.Filter, testcases []testcase, p func expectLog(ctx context.Context, t *testing.T, input interface{}, ch <-chan []*binlogdatapb.VEvent, output [][]string) { t.Helper() + timer := time.NewTimer(1 * time.Minute) + defer timer.Stop() for _, wantset := range output { var evs []*binlogdatapb.VEvent for { @@ -1125,6 +1133,8 @@ func expectLog(ctx context.Context, t *testing.T, input interface{}, ch <-chan [ } case <-ctx.Done(): t.Fatal("stream ended early") + case <-timer.C: + t.Fatalf("timed out waiting for events: %v", wantset) } if len(evs) != 0 { break