Skip to content

Commit

Permalink
Merge pull request #7659 from planetscale/rn-vr-heartbeat-update
Browse files Browse the repository at this point in the history
Make the frequency at which heartbeats update the _vt.vreplication table configurable
  • Loading branch information
rohit-nayak-ps authored Mar 14, 2021
2 parents 58fc9a1 + 3e3617d commit 6d5204b
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 4 deletions.
25 changes: 22 additions & 3 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ type vplayer struct {
lastTimestampNs int64
// timeOffsetNs keeps track of the clock difference with respect to source tablet.
timeOffsetNs int64
// numAccumulatedHeartbeats keeps track of how many heartbeats have been received since we updated the time_updated column of _vt.vreplication
numAccumulatedHeartbeats int

// canAcceptStmtEvents is set to true if the current player can accept events in statement mode. Only true for filters that are match all.
canAcceptStmtEvents bool

Expand Down Expand Up @@ -227,6 +230,7 @@ func (vp *vplayer) applyRowEvent(ctx context.Context, rowEvent *binlogdatapb.Row
}

func (vp *vplayer) updatePos(ts int64) (posReached bool, err error) {
vp.numAccumulatedHeartbeats = 0
update := binlogplayer.GenerateUpdatePos(vp.vr.id, vp.pos, time.Now().Unix(), ts)
if _, err := vp.vr.dbClient.Execute(update); err != nil {
return false, fmt.Errorf("error %v updating position", err)
Expand All @@ -246,9 +250,7 @@ func (vp *vplayer) updatePos(ts int64) (posReached bool, err error) {
return posReached, nil
}

func (vp *vplayer) recordHeartbeat() (err error) {
tm := time.Now().Unix()
vp.vr.stats.RecordHeartbeat(tm)
func (vp *vplayer) updateCurrentTime(tm int64) error {
update, err := binlogplayer.GenerateUpdateTime(vp.vr.id, tm)
if err != nil {
return err
Expand All @@ -259,6 +261,21 @@ func (vp *vplayer) recordHeartbeat() (err error) {
return nil
}

func (vp *vplayer) mustUpdateCurrentTime() bool {
return vp.numAccumulatedHeartbeats >= *vreplicationHeartbeatUpdateInterval ||
vp.numAccumulatedHeartbeats >= vreplicationMinimumHeartbeatUpdateInterval
}

func (vp *vplayer) recordHeartbeat() error {
tm := time.Now().Unix()
vp.vr.stats.RecordHeartbeat(tm)
if !vp.mustUpdateCurrentTime() {
return nil
}
vp.numAccumulatedHeartbeats = 0
return vp.updateCurrentTime(tm)
}

// applyEvents is the main thread that applies the events. It has the following use
// cases to take into account:
// * Normal transaction that has row mutations. In this case, the transaction
Expand Down Expand Up @@ -606,11 +623,13 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m
return io.EOF
case binlogdatapb.VEventType_HEARTBEAT:
if !vp.vr.dbClient.InTransaction {
vp.numAccumulatedHeartbeats++
err := vp.recordHeartbeat()
if err != nil {
return err
}
}
}

return nil
}
34 changes: 34 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,40 @@ import (
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)

func TestHeartbeatFrequencyFlag(t *testing.T) {
origVReplicationHeartbeatUpdateInterval := *vreplicationHeartbeatUpdateInterval
defer func() {
*vreplicationHeartbeatUpdateInterval = origVReplicationHeartbeatUpdateInterval
}()

stats := binlogplayer.NewStats()
vp := &vplayer{vr: &vreplicator{dbClient: newVDBClient(realDBClientFactory(), stats), stats: stats}}

type testcount struct {
count int
mustUpdate bool
}
type testcase struct {
name string
interval int
counts []testcount
}
testcases := []*testcase{
{"default frequency", 1, []testcount{{count: 0, mustUpdate: false}, {1, true}}},
{"custom frequency", 4, []testcount{{count: 0, mustUpdate: false}, {count: 3, mustUpdate: false}, {4, true}}},
{"minumum frequency", 61, []testcount{{count: 59, mustUpdate: false}, {count: 60, mustUpdate: true}, {61, true}}},
}
for _, tcase := range testcases {
t.Run(tcase.name, func(t *testing.T) {
*vreplicationHeartbeatUpdateInterval = tcase.interval
for _, tcount := range tcase.counts {
vp.numAccumulatedHeartbeats = tcount.count
require.Equal(t, tcount.mustUpdate, vp.mustUpdateCurrentTime())
}
})
}
}

func TestVReplicationTimeUpdated(t *testing.T) {
ctx := context.Background()
defer deleteTablet(addTablet(100))
Expand Down
17 changes: 16 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/vreplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,23 @@ var (
// idleTimeout is set to slightly above 1s, compared to heartbeatTime
// set by VStreamer at slightly below 1s. This minimizes conflicts
// between the two timeouts.
idleTimeout = 1100 * time.Millisecond
idleTimeout = 1100 * time.Millisecond

dbLockRetryDelay = 1 * time.Second
relayLogMaxSize = flag.Int("relay_log_max_size", 250000, "Maximum buffer size (in bytes) for VReplication target buffering. If single rows are larger than this, a single row is buffered at a time.")
relayLogMaxItems = flag.Int("relay_log_max_items", 5000, "Maximum number of rows for VReplication target buffering.")
copyTimeout = 1 * time.Hour
replicaLagTolerance = 10 * time.Second

// vreplicationHeartbeatUpdateInterval determines how often the time_updated column is updated if there are no real events on the source and the source
// vstream is only sending heartbeats for this long. Keep this low if you expect high QPS and are monitoring this column to alert about potential
// outages. Keep this high if
// you have too many streams the extra write qps or cpu load due to these updates are unacceptable
// you have too many streams and/or a large source field (lot of participating tables) which generates unacceptable increase in your binlog size
vreplicationHeartbeatUpdateInterval = flag.Int("vreplication_heartbeat_update_interval", 1, "Frequency (in seconds, default 1, max 60) at which the time_updated column of a vreplication stream when idling")
// vreplicationMinimumHeartbeatUpdateInterval overrides vreplicationHeartbeatUpdateInterval if the latter is higher than this
// to ensure that it satisfies liveness criteria implicitly expected by internal processes like Online DDL
vreplicationMinimumHeartbeatUpdateInterval = 60
)

// vreplicator provides the core logic to start vreplication streams
Expand Down Expand Up @@ -87,6 +98,10 @@ type vreplicator struct {
// More advanced constructs can be used. Please see the table plan builder
// documentation for more info.
func newVReplicator(id uint32, source *binlogdatapb.BinlogSource, sourceVStreamer VStreamerClient, stats *binlogplayer.Stats, dbClient binlogplayer.DBClient, mysqld mysqlctl.MysqlDaemon, vre *Engine) *vreplicator {
if *vreplicationHeartbeatUpdateInterval > vreplicationMinimumHeartbeatUpdateInterval {
log.Warningf("the supplied value for vreplication_heartbeat_update_interval:%d seconds is larger than the maximum allowed:%d seconds, vreplication will fallback to %d",
*vreplicationHeartbeatUpdateInterval, vreplicationMinimumHeartbeatUpdateInterval, vreplicationMinimumHeartbeatUpdateInterval)
}
return &vreplicator{
vre: vre,
id: id,
Expand Down

0 comments on commit 6d5204b

Please sign in to comment.