Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the frequency at which heartbeats update the _vt.vreplication table configurable #7659

Merged
merged 4 commits into from
Mar 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ type vplayer struct {
lastTimestampNs int64
// timeOffsetNs keeps track of the clock difference with respect to source tablet.
timeOffsetNs int64
// numAccumulatedHeartbeats keeps track of how many heartbeats have been received since we updated the time_updated column of _vt.vreplication
numAccumulatedHeartbeats int

// canAcceptStmtEvents is set to true if the current player can accept events in statement mode. Only true for filters that are match all.
canAcceptStmtEvents bool

Expand Down Expand Up @@ -227,6 +230,7 @@ func (vp *vplayer) applyRowEvent(ctx context.Context, rowEvent *binlogdatapb.Row
}

func (vp *vplayer) updatePos(ts int64) (posReached bool, err error) {
vp.numAccumulatedHeartbeats = 0
update := binlogplayer.GenerateUpdatePos(vp.vr.id, vp.pos, time.Now().Unix(), ts)
if _, err := vp.vr.dbClient.Execute(update); err != nil {
return false, fmt.Errorf("error %v updating position", err)
Expand All @@ -246,9 +250,7 @@ func (vp *vplayer) updatePos(ts int64) (posReached bool, err error) {
return posReached, nil
}

func (vp *vplayer) recordHeartbeat() (err error) {
tm := time.Now().Unix()
vp.vr.stats.RecordHeartbeat(tm)
func (vp *vplayer) updateCurrentTime(tm int64) error {
update, err := binlogplayer.GenerateUpdateTime(vp.vr.id, tm)
if err != nil {
return err
Expand All @@ -259,6 +261,21 @@ func (vp *vplayer) recordHeartbeat() (err error) {
return nil
}

func (vp *vplayer) mustUpdateCurrentTime() bool {
return vp.numAccumulatedHeartbeats >= *vreplicationHeartbeatUpdateInterval ||
vp.numAccumulatedHeartbeats >= vreplicationMinimumHeartbeatUpdateInterval
}

func (vp *vplayer) recordHeartbeat() error {
tm := time.Now().Unix()
vp.vr.stats.RecordHeartbeat(tm)
if !vp.mustUpdateCurrentTime() {
return nil
}
vp.numAccumulatedHeartbeats = 0
return vp.updateCurrentTime(tm)
}

// applyEvents is the main thread that applies the events. It has the following use
// cases to take into account:
// * Normal transaction that has row mutations. In this case, the transaction
Expand Down Expand Up @@ -606,11 +623,13 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m
return io.EOF
case binlogdatapb.VEventType_HEARTBEAT:
if !vp.vr.dbClient.InTransaction {
vp.numAccumulatedHeartbeats++
err := vp.recordHeartbeat()
if err != nil {
return err
}
}
}

return nil
}
34 changes: 34 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,40 @@ import (
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)

func TestHeartbeatFrequencyFlag(t *testing.T) {
origVReplicationHeartbeatUpdateInterval := *vreplicationHeartbeatUpdateInterval
defer func() {
*vreplicationHeartbeatUpdateInterval = origVReplicationHeartbeatUpdateInterval
}()

stats := binlogplayer.NewStats()
vp := &vplayer{vr: &vreplicator{dbClient: newVDBClient(realDBClientFactory(), stats), stats: stats}}

type testcount struct {
count int
mustUpdate bool
}
type testcase struct {
name string
interval int
counts []testcount
}
testcases := []*testcase{
{"default frequency", 1, []testcount{{count: 0, mustUpdate: false}, {1, true}}},
{"custom frequency", 4, []testcount{{count: 0, mustUpdate: false}, {count: 3, mustUpdate: false}, {4, true}}},
{"minumum frequency", 61, []testcount{{count: 59, mustUpdate: false}, {count: 60, mustUpdate: true}, {61, true}}},
}
for _, tcase := range testcases {
t.Run(tcase.name, func(t *testing.T) {
*vreplicationHeartbeatUpdateInterval = tcase.interval
for _, tcount := range tcase.counts {
vp.numAccumulatedHeartbeats = tcount.count
require.Equal(t, tcount.mustUpdate, vp.mustUpdateCurrentTime())
}
})
}
}

func TestVReplicationTimeUpdated(t *testing.T) {
ctx := context.Background()
defer deleteTablet(addTablet(100))
Expand Down
17 changes: 16 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/vreplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,23 @@ var (
// idleTimeout is set to slightly above 1s, compared to heartbeatTime
// set by VStreamer at slightly below 1s. This minimizes conflicts
// between the two timeouts.
idleTimeout = 1100 * time.Millisecond
idleTimeout = 1100 * time.Millisecond

dbLockRetryDelay = 1 * time.Second
relayLogMaxSize = flag.Int("relay_log_max_size", 250000, "Maximum buffer size (in bytes) for VReplication target buffering. If single rows are larger than this, a single row is buffered at a time.")
relayLogMaxItems = flag.Int("relay_log_max_items", 5000, "Maximum number of rows for VReplication target buffering.")
copyTimeout = 1 * time.Hour
replicaLagTolerance = 10 * time.Second

// vreplicationHeartbeatUpdateInterval determines how often the time_updated column is updated if there are no real events on the source and the source
// vstream is only sending heartbeats for this long. Keep this low if you expect high QPS and are monitoring this column to alert about potential
// outages. Keep this high if
// you have too many streams the extra write qps or cpu load due to these updates are unacceptable
// you have too many streams and/or a large source field (lot of participating tables) which generates unacceptable increase in your binlog size
vreplicationHeartbeatUpdateInterval = flag.Int("vreplication_heartbeat_update_interval", 1, "Frequency (in seconds, default 1, max 60) at which the time_updated column of a vreplication stream when idling")
// vreplicationMinimumHeartbeatUpdateInterval overrides vreplicationHeartbeatUpdateInterval if the latter is higher than this
// to ensure that it satisfies liveness criteria implicitly expected by internal processes like Online DDL
vreplicationMinimumHeartbeatUpdateInterval = 60
)

// vreplicator provides the core logic to start vreplication streams
Expand Down Expand Up @@ -87,6 +98,10 @@ type vreplicator struct {
// More advanced constructs can be used. Please see the table plan builder
// documentation for more info.
func newVReplicator(id uint32, source *binlogdatapb.BinlogSource, sourceVStreamer VStreamerClient, stats *binlogplayer.Stats, dbClient binlogplayer.DBClient, mysqld mysqlctl.MysqlDaemon, vre *Engine) *vreplicator {
if *vreplicationHeartbeatUpdateInterval > vreplicationMinimumHeartbeatUpdateInterval {
log.Warningf("the supplied value for vreplication_heartbeat_update_interval:%d seconds is larger than the maximum allowed:%d seconds, vreplication will fallback to %d",
*vreplicationHeartbeatUpdateInterval, vreplicationMinimumHeartbeatUpdateInterval, vreplicationMinimumHeartbeatUpdateInterval)
}
return &vreplicator{
vre: vre,
id: id,
Expand Down