Skip to content

Commit

Permalink
Implement upper bound on the update interval
Browse files Browse the repository at this point in the history
Signed-off-by: Rohit Nayak <rohit@planetscale.com>
  • Loading branch information
rohit-nayak-ps committed Mar 11, 2021
1 parent 18a4a64 commit 6638c54
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 19 deletions.
3 changes: 2 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,8 @@ func (vp *vplayer) updateCurrentTime(tm int64) error {
}

func (vp *vplayer) mustUpdateCurrentTime() bool {
return vp.numAccumulatedHeartbeats >= *vreplicationHeartbeatUpdateInterval
return vp.numAccumulatedHeartbeats >= *vreplicationHeartbeatUpdateInterval ||
vp.numAccumulatedHeartbeats >= vreplicationMinimumHeartbeatUpdateInterval
}

func (vp *vplayer) recordHeartbeat() error {
Expand Down
40 changes: 23 additions & 17 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,23 +47,29 @@ func TestHeartbeatFrequencyFlag(t *testing.T) {
stats := binlogplayer.NewStats()
vp := &vplayer{vr: &vreplicator{dbClient: newVDBClient(realDBClientFactory(), stats), stats: stats}}

t.Run("default frequency", func(t *testing.T) {
vp.numAccumulatedHeartbeats = 0
require.False(t, vp.mustUpdateCurrentTime())
vp.numAccumulatedHeartbeats = 1
require.True(t, vp.mustUpdateCurrentTime())
})

*vreplicationHeartbeatUpdateInterval = 4

t.Run("custom frequency", func(t *testing.T) {
vp.numAccumulatedHeartbeats = 0
require.False(t, vp.mustUpdateCurrentTime())
vp.numAccumulatedHeartbeats = 3
require.False(t, vp.mustUpdateCurrentTime())
vp.numAccumulatedHeartbeats = 4
require.True(t, vp.mustUpdateCurrentTime())
})
type testcount struct {
count int
mustUpdate bool
}
type testcase struct {
name string
interval int
counts []testcount
}
testcases := []*testcase{
{"default frequency", 1, []testcount{{count: 0, mustUpdate: false}, {1, true}}},
{"custom frequency", 4, []testcount{{count: 0, mustUpdate: false}, {count: 3, mustUpdate: false}, {4, true}}},
{"minumum frequency", 61, []testcount{{count: 59, mustUpdate: false}, {count: 60, mustUpdate: true}, {61, true}}},
}
for _, tcase := range testcases {
t.Run(tcase.name, func(t *testing.T) {
*vreplicationHeartbeatUpdateInterval = tcase.interval
for _, tcount := range tcase.counts {
vp.numAccumulatedHeartbeats = tcount.count
require.Equal(t, tcount.mustUpdate, vp.mustUpdateCurrentTime())
}
})
}
}

func TestVReplicationTimeUpdated(t *testing.T) {
Expand Down
5 changes: 4 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/vreplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ var (
// outages. Keep this high if
// you have too many streams the extra write qps or cpu load due to these updates are unacceptable
// you have too many streams and/or a large source field (lot of participating tables) which generates unacceptable increase in your binlog size
vreplicationHeartbeatUpdateInterval = flag.Int("vreplication_heartbeat_update_interval", 1, "Frequency (in seconds) at which the time_updated column of a vreplication stream when idling")
vreplicationHeartbeatUpdateInterval = flag.Int("vreplication_heartbeat_update_interval", 1, "Frequency (in seconds, default 1, max 60) at which the time_updated column of a vreplication stream when idling")
// vreplicationMinimumHeartbeatUpdateInterval overrides vreplicationHeartbeatUpdateInterval if the latter is higher than this
// to ensure that it satisfies liveness criteria implicitly expected by internal processes like Online DDL
vreplicationMinimumHeartbeatUpdateInterval = 60
)

// vreplicator provides the core logic to start vreplication streams
Expand Down

0 comments on commit 6638c54

Please sign in to comment.