diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index 77f688d9d31..3e04b1d5b0e 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -58,6 +58,9 @@ type vplayer struct { lastTimestampNs int64 // timeOffsetNs keeps track of the clock difference with respect to source tablet. timeOffsetNs int64 + // numAccumulatedHeartbeats keeps track of how many heartbeats have been received since we updated the time_updated column of _vt.vreplication + numAccumulatedHeartbeats int + // canAcceptStmtEvents is set to true if the current player can accept events in statement mode. Only true for filters that are match all. canAcceptStmtEvents bool @@ -227,6 +230,7 @@ func (vp *vplayer) applyRowEvent(ctx context.Context, rowEvent *binlogdatapb.Row } func (vp *vplayer) updatePos(ts int64) (posReached bool, err error) { + vp.numAccumulatedHeartbeats = 0 update := binlogplayer.GenerateUpdatePos(vp.vr.id, vp.pos, time.Now().Unix(), ts) if _, err := vp.vr.dbClient.Execute(update); err != nil { return false, fmt.Errorf("error %v updating position", err) @@ -246,9 +250,7 @@ func (vp *vplayer) updatePos(ts int64) (posReached bool, err error) { return posReached, nil } -func (vp *vplayer) recordHeartbeat() (err error) { - tm := time.Now().Unix() - vp.vr.stats.RecordHeartbeat(tm) +func (vp *vplayer) updateCurrentTime(tm int64) error { update, err := binlogplayer.GenerateUpdateTime(vp.vr.id, tm) if err != nil { return err @@ -259,6 +261,21 @@ func (vp *vplayer) recordHeartbeat() (err error) { return nil } +func (vp *vplayer) mustUpdateCurrentTime() bool { + return vp.numAccumulatedHeartbeats >= *vreplicationHeartbeatUpdateInterval || + vp.numAccumulatedHeartbeats >= vreplicationMinimumHeartbeatUpdateInterval +} + +func (vp *vplayer) recordHeartbeat() error { + tm := time.Now().Unix() + vp.vr.stats.RecordHeartbeat(tm) + if !vp.mustUpdateCurrentTime() { + return nil + } + vp.numAccumulatedHeartbeats = 0 + return vp.updateCurrentTime(tm) +} + // applyEvents is the main thread that applies the events. It has the following use // cases to take into account: // * Normal transaction that has row mutations. In this case, the transaction @@ -606,11 +623,13 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m return io.EOF case binlogdatapb.VEventType_HEARTBEAT: if !vp.vr.dbClient.InTransaction { + vp.numAccumulatedHeartbeats++ err := vp.recordHeartbeat() if err != nil { return err } } } + return nil } diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go index 24e2a1773c9..99357e79ef4 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go @@ -38,6 +38,40 @@ import ( binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ) +func TestHeartbeatFrequencyFlag(t *testing.T) { + origVReplicationHeartbeatUpdateInterval := *vreplicationHeartbeatUpdateInterval + defer func() { + *vreplicationHeartbeatUpdateInterval = origVReplicationHeartbeatUpdateInterval + }() + + stats := binlogplayer.NewStats() + vp := &vplayer{vr: &vreplicator{dbClient: newVDBClient(realDBClientFactory(), stats), stats: stats}} + + type testcount struct { + count int + mustUpdate bool + } + type testcase struct { + name string + interval int + counts []testcount + } + testcases := []*testcase{ + {"default frequency", 1, []testcount{{count: 0, mustUpdate: false}, {1, true}}}, + {"custom frequency", 4, []testcount{{count: 0, mustUpdate: false}, {count: 3, mustUpdate: false}, {4, true}}}, + {"minumum frequency", 61, []testcount{{count: 59, mustUpdate: false}, {count: 60, mustUpdate: true}, {61, true}}}, + } + for _, tcase := range testcases { + t.Run(tcase.name, func(t *testing.T) { + *vreplicationHeartbeatUpdateInterval = tcase.interval + for _, tcount := range tcase.counts { + vp.numAccumulatedHeartbeats = tcount.count + require.Equal(t, tcount.mustUpdate, vp.mustUpdateCurrentTime()) + } + }) + } +} + func TestVReplicationTimeUpdated(t *testing.T) { ctx := context.Background() defer deleteTablet(addTablet(100)) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go index 5cd14fd8733..ddd36ce9a02 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go @@ -41,12 +41,23 @@ var ( // idleTimeout is set to slightly above 1s, compared to heartbeatTime // set by VStreamer at slightly below 1s. This minimizes conflicts // between the two timeouts. - idleTimeout = 1100 * time.Millisecond + idleTimeout = 1100 * time.Millisecond + dbLockRetryDelay = 1 * time.Second relayLogMaxSize = flag.Int("relay_log_max_size", 250000, "Maximum buffer size (in bytes) for VReplication target buffering. If single rows are larger than this, a single row is buffered at a time.") relayLogMaxItems = flag.Int("relay_log_max_items", 5000, "Maximum number of rows for VReplication target buffering.") copyTimeout = 1 * time.Hour replicaLagTolerance = 10 * time.Second + + // vreplicationHeartbeatUpdateInterval determines how often the time_updated column is updated if there are no real events on the source and the source + // vstream is only sending heartbeats for this long. Keep this low if you expect high QPS and are monitoring this column to alert about potential + // outages. Keep this high if + // you have too many streams the extra write qps or cpu load due to these updates are unacceptable + // you have too many streams and/or a large source field (lot of participating tables) which generates unacceptable increase in your binlog size + vreplicationHeartbeatUpdateInterval = flag.Int("vreplication_heartbeat_update_interval", 1, "Frequency (in seconds, default 1, max 60) at which the time_updated column of a vreplication stream when idling") + // vreplicationMinimumHeartbeatUpdateInterval overrides vreplicationHeartbeatUpdateInterval if the latter is higher than this + // to ensure that it satisfies liveness criteria implicitly expected by internal processes like Online DDL + vreplicationMinimumHeartbeatUpdateInterval = 60 ) // vreplicator provides the core logic to start vreplication streams @@ -87,6 +98,10 @@ type vreplicator struct { // More advanced constructs can be used. Please see the table plan builder // documentation for more info. func newVReplicator(id uint32, source *binlogdatapb.BinlogSource, sourceVStreamer VStreamerClient, stats *binlogplayer.Stats, dbClient binlogplayer.DBClient, mysqld mysqlctl.MysqlDaemon, vre *Engine) *vreplicator { + if *vreplicationHeartbeatUpdateInterval > vreplicationMinimumHeartbeatUpdateInterval { + log.Warningf("the supplied value for vreplication_heartbeat_update_interval:%d seconds is larger than the maximum allowed:%d seconds, vreplication will fallback to %d", + *vreplicationHeartbeatUpdateInterval, vreplicationMinimumHeartbeatUpdateInterval, vreplicationMinimumHeartbeatUpdateInterval) + } return &vreplicator{ vre: vre, id: id,