From 6638c54d1ded4452fe55f7d1b94bd399cc2785cf Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Thu, 11 Mar 2021 12:08:31 +0100 Subject: [PATCH] Implement upper bound on the update interval Signed-off-by: Rohit Nayak --- .../tabletmanager/vreplication/vplayer.go | 3 +- .../vreplication/vplayer_flaky_test.go | 40 +++++++++++-------- .../tabletmanager/vreplication/vreplicator.go | 5 ++- 3 files changed, 29 insertions(+), 19 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index b1999702c87..3e04b1d5b0e 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -262,7 +262,8 @@ func (vp *vplayer) updateCurrentTime(tm int64) error { } func (vp *vplayer) mustUpdateCurrentTime() bool { - return vp.numAccumulatedHeartbeats >= *vreplicationHeartbeatUpdateInterval + return vp.numAccumulatedHeartbeats >= *vreplicationHeartbeatUpdateInterval || + vp.numAccumulatedHeartbeats >= vreplicationMinimumHeartbeatUpdateInterval } func (vp *vplayer) recordHeartbeat() error { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go index ab75df054d2..99357e79ef4 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go @@ -47,23 +47,29 @@ func TestHeartbeatFrequencyFlag(t *testing.T) { stats := binlogplayer.NewStats() vp := &vplayer{vr: &vreplicator{dbClient: newVDBClient(realDBClientFactory(), stats), stats: stats}} - t.Run("default frequency", func(t *testing.T) { - vp.numAccumulatedHeartbeats = 0 - require.False(t, vp.mustUpdateCurrentTime()) - vp.numAccumulatedHeartbeats = 1 - require.True(t, vp.mustUpdateCurrentTime()) - }) - - *vreplicationHeartbeatUpdateInterval = 4 - - t.Run("custom frequency", func(t *testing.T) { - vp.numAccumulatedHeartbeats = 0 - require.False(t, vp.mustUpdateCurrentTime()) - vp.numAccumulatedHeartbeats = 3 - require.False(t, vp.mustUpdateCurrentTime()) - vp.numAccumulatedHeartbeats = 4 - require.True(t, vp.mustUpdateCurrentTime()) - }) + type testcount struct { + count int + mustUpdate bool + } + type testcase struct { + name string + interval int + counts []testcount + } + testcases := []*testcase{ + {"default frequency", 1, []testcount{{count: 0, mustUpdate: false}, {1, true}}}, + {"custom frequency", 4, []testcount{{count: 0, mustUpdate: false}, {count: 3, mustUpdate: false}, {4, true}}}, + {"minumum frequency", 61, []testcount{{count: 59, mustUpdate: false}, {count: 60, mustUpdate: true}, {61, true}}}, + } + for _, tcase := range testcases { + t.Run(tcase.name, func(t *testing.T) { + *vreplicationHeartbeatUpdateInterval = tcase.interval + for _, tcount := range tcase.counts { + vp.numAccumulatedHeartbeats = tcount.count + require.Equal(t, tcount.mustUpdate, vp.mustUpdateCurrentTime()) + } + }) + } } func TestVReplicationTimeUpdated(t *testing.T) { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go index 62c4444bb3e..47c77324d15 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go @@ -54,7 +54,10 @@ var ( // outages. Keep this high if // you have too many streams the extra write qps or cpu load due to these updates are unacceptable // you have too many streams and/or a large source field (lot of participating tables) which generates unacceptable increase in your binlog size - vreplicationHeartbeatUpdateInterval = flag.Int("vreplication_heartbeat_update_interval", 1, "Frequency (in seconds) at which the time_updated column of a vreplication stream when idling") + vreplicationHeartbeatUpdateInterval = flag.Int("vreplication_heartbeat_update_interval", 1, "Frequency (in seconds, default 1, max 60) at which the time_updated column of a vreplication stream when idling") + // vreplicationMinimumHeartbeatUpdateInterval overrides vreplicationHeartbeatUpdateInterval if the latter is higher than this + // to ensure that it satisfies liveness criteria implicitly expected by internal processes like Online DDL + vreplicationMinimumHeartbeatUpdateInterval = 60 ) // vreplicator provides the core logic to start vreplication streams