Skip to content

Commit

Permalink
Merge pull request #4637 from planetscale/ss-vheart
Browse files Browse the repository at this point in the history
vreplication: improved lag tracking
  • Loading branch information
sougou authored Mar 3, 2019
2 parents 6585831 + bec54fd commit 31178f9
Show file tree
Hide file tree
Showing 7 changed files with 227 additions and 153 deletions.
282 changes: 148 additions & 134 deletions go/vt/proto/binlogdata/binlogdata.pb.go

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/relaylog.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ type relayLog struct {
curSize int
items [][]*binlogdatapb.VEvent
timedout bool
err error
// canAccept is true if: curSize<=maxSize, len(items)<maxItems, and ctx is not Done.
canAccept sync.Cond
// hasItems is true if len(items)>0, ctx is not Done, and interuptFetch is false.
Expand Down
27 changes: 22 additions & 5 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ import (
)

var (
idleTimeout = 1 * time.Second
// idleTimeout is set to slightly above 1s, compared to heartbeatTime
// set by VStreamer at slightly below 1s. This minimizes conflicts
// between the two timeouts.
idleTimeout = 1100 * time.Millisecond
dbLockRetryDelay = 1 * time.Second
relayLogMaxSize = 10000
relayLogMaxItems = 1000
Expand All @@ -61,7 +64,11 @@ type vplayer struct {
unsavedGTID *binlogdatapb.VEvent
// timeLastSaved is set every time a GTID is saved.
timeLastSaved time.Time
stopPos mysql.Position
// lastTimestampNs is the last timestamp seen so far.
lastTimestampNs int64
// timeOffsetNs keeps track of the clock difference with respect to source tablet.
timeOffsetNs int64
stopPos mysql.Position

// pplan is built based on the source Filter at the beginning.
pplan *PlayerPlan
Expand Down Expand Up @@ -197,6 +204,12 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error {
if err != nil {
return err
}
// No events were received. This likely means that there's a network partition.
// So, we should assume we're falling behind.
if len(items) == 0 {
behind := time.Now().UnixNano() - vp.lastTimestampNs - vp.timeOffsetNs
vp.stats.SecondsBehindMaster.Set(behind / 1e9)
}
// Filtered replication often ends up receiving a large number of empty transactions.
// This is required because the player needs to know the latest position of the source.
// This allows it to stop at that position if requested.
Expand All @@ -221,6 +234,11 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error {
}
for i, events := range items {
for j, event := range events {
if event.Timestamp != 0 {
vp.lastTimestampNs = event.Timestamp * 1e9
vp.timeOffsetNs = time.Now().UnixNano() - event.CurrentTime
vp.stats.SecondsBehindMaster.Set(event.CurrentTime/1e9 - event.Timestamp)
}
mustSave := false
switch event.Type {
case binlogdatapb.VEventType_COMMIT:
Expand Down Expand Up @@ -354,6 +372,8 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m
return err
}
}
case binlogdatapb.VEventType_HEARTBEAT:
// No-op: heartbeat timings are calculated in outer loop.
}
return nil
}
Expand Down Expand Up @@ -444,9 +464,6 @@ func (vp *vplayer) updatePos(ts int64) error {
vp.unsavedGTID = nil
vp.timeLastSaved = time.Now()
vp.stats.SetLastPosition(vp.pos)
if ts != 0 {
vp.stats.SecondsBehindMaster.Set(vp.timeLastSaved.Unix() - ts)
}
return nil
}

Expand Down
33 changes: 30 additions & 3 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"flag"
"fmt"
"io"
"time"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
Expand All @@ -36,6 +37,11 @@ import (

var packetSize = flag.Int("vstream_packet_size", 10000, "Suggested packet size for VReplication streamer. This is used only as a recommendation. The actual packet size may be more or less than this amount.")

// heartbeatTime is set to slightly below 1s, compared to idleTimeout
// set by VPlayer at slightly above 1s. This minimizes conflicts
// between the two timeouts.
var heartbeatTime = 900 * time.Millisecond

type vstreamer struct {
ctx context.Context
cancel func()
Expand Down Expand Up @@ -132,9 +138,8 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
case binlogdatapb.VEventType_GTID, binlogdatapb.VEventType_BEGIN, binlogdatapb.VEventType_FIELD:
// We never have to send GTID, BEGIN or FIELD events on their own.
bufferedEvents = append(bufferedEvents, vevent)
case binlogdatapb.VEventType_COMMIT, binlogdatapb.VEventType_DDL:
// COMMIT and DDL are terminal. There may be no more events after
// these for a long time. So, we have to send whatever we have.
case binlogdatapb.VEventType_COMMIT, binlogdatapb.VEventType_DDL, binlogdatapb.VEventType_HEARTBEAT:
// COMMIT, DDL and HEARTBEAT must be immediately sent.
bufferedEvents = append(bufferedEvents, vevent)
vevents := bufferedEvents
bufferedEvents = nil
Expand Down Expand Up @@ -167,7 +172,16 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
}

// Main loop: calls bufferAndTransmit as events arrive.
timer := time.NewTimer(heartbeatTime)
defer timer.Stop()
for {
timer.Reset(heartbeatTime)
// Drain event if timer fired before reset.
select {
case <-timer.C:
default:
}

select {
case ev, ok := <-events:
if !ok {
Expand Down Expand Up @@ -196,6 +210,18 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
}
case <-ctx.Done():
return nil
case <-timer.C:
now := time.Now().UnixNano()
if err := bufferAndTransmit(&binlogdatapb.VEvent{
Type: binlogdatapb.VEventType_HEARTBEAT,
Timestamp: now / 1e9,
CurrentTime: now,
}); err != nil {
if err == io.EOF {
return nil
}
return fmt.Errorf("error sending event: %v", err)
}
}
}
}
Expand Down Expand Up @@ -392,6 +418,7 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e
}
for _, vevent := range vevents {
vevent.Timestamp = int64(ev.Timestamp())
vevent.CurrentTime = time.Now().UnixNano()
}
return vevents, nil
}
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -919,6 +919,8 @@ func expectLog(ctx context.Context, t *testing.T, input interface{}, ch <-chan [
t.Fatalf("%v: evs\n%v, want\n%v", input, evs, wantset)
}
for i, want := range wantset {
// CurrentTime is not testable.
evs[i].CurrentTime = 0
switch want {
case "gtid|begin":
if evs[i].Type != binlogdatapb.VEventType_GTID && evs[i].Type != binlogdatapb.VEventType_BEGIN {
Expand Down
3 changes: 3 additions & 0 deletions proto/binlogdata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ enum VEventType {
OTHER = 11;
ROW = 12;
FIELD = 13;
HEARTBEAT = 14;
}

// RowChange represents one row change
Expand Down Expand Up @@ -212,6 +213,8 @@ message VEvent {
string ddl = 4;
RowEvent row_event = 5;
FieldEvent field_event = 6;
// current_time specifies the current time to handle clock skew.
int64 current_time = 20;
}

// VStreamRequest is the payload for VStream
Expand Down
32 changes: 22 additions & 10 deletions py/vtproto/binlogdata_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 31178f9

Please sign in to comment.