Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vreplication: improved lag tracking #4637

Merged
merged 2 commits into from
Mar 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
282 changes: 148 additions & 134 deletions go/vt/proto/binlogdata/binlogdata.pb.go

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/relaylog.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ type relayLog struct {
curSize int
items [][]*binlogdatapb.VEvent
timedout bool
err error
// canAccept is true if: curSize<=maxSize, len(items)<maxItems, and ctx is not Done.
canAccept sync.Cond
// hasItems is true if len(items)>0, ctx is not Done, and interuptFetch is false.
Expand Down
27 changes: 22 additions & 5 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ import (
)

var (
idleTimeout = 1 * time.Second
// idleTimeout is set to slightly above 1s, compared to heartbeatTime
// set by VStreamer at slightly below 1s. This minimizes conflicts
// between the two timeouts.
idleTimeout = 1100 * time.Millisecond
dbLockRetryDelay = 1 * time.Second
relayLogMaxSize = 10000
relayLogMaxItems = 1000
Expand All @@ -61,7 +64,11 @@ type vplayer struct {
unsavedGTID *binlogdatapb.VEvent
// timeLastSaved is set every time a GTID is saved.
timeLastSaved time.Time
stopPos mysql.Position
// lastTimestampNs is the last timestamp seen so far.
lastTimestampNs int64
// timeOffsetNs keeps track of the clock difference with respect to source tablet.
timeOffsetNs int64
stopPos mysql.Position

// pplan is built based on the source Filter at the beginning.
pplan *PlayerPlan
Expand Down Expand Up @@ -197,6 +204,12 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error {
if err != nil {
return err
}
// No events were received. This likely means that there's a network partition.
// So, we should assume we're falling behind.
if len(items) == 0 {
behind := time.Now().UnixNano() - vp.lastTimestampNs - vp.timeOffsetNs
vp.stats.SecondsBehindMaster.Set(behind / 1e9)
}
// Filtered replication often ends up receiving a large number of empty transactions.
// This is required because the player needs to know the latest position of the source.
// This allows it to stop at that position if requested.
Expand All @@ -221,6 +234,11 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error {
}
for i, events := range items {
for j, event := range events {
if event.Timestamp != 0 {
vp.lastTimestampNs = event.Timestamp * 1e9
vp.timeOffsetNs = time.Now().UnixNano() - event.CurrentTime
vp.stats.SecondsBehindMaster.Set(event.CurrentTime/1e9 - event.Timestamp)
}
mustSave := false
switch event.Type {
case binlogdatapb.VEventType_COMMIT:
Expand Down Expand Up @@ -354,6 +372,8 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m
return err
}
}
case binlogdatapb.VEventType_HEARTBEAT:
// No-op: heartbeat timings are calculated in outer loop.
}
return nil
}
Expand Down Expand Up @@ -444,9 +464,6 @@ func (vp *vplayer) updatePos(ts int64) error {
vp.unsavedGTID = nil
vp.timeLastSaved = time.Now()
vp.stats.SetLastPosition(vp.pos)
if ts != 0 {
vp.stats.SecondsBehindMaster.Set(vp.timeLastSaved.Unix() - ts)
}
return nil
}

Expand Down
33 changes: 30 additions & 3 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"flag"
"fmt"
"io"
"time"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
Expand All @@ -36,6 +37,11 @@ import (

var packetSize = flag.Int("vstream_packet_size", 10000, "Suggested packet size for VReplication streamer. This is used only as a recommendation. The actual packet size may be more or less than this amount.")

// heartbeatTime is set to slightly below 1s, compared to idleTimeout
// set by VPlayer at slightly above 1s. This minimizes conflicts
// between the two timeouts.
var heartbeatTime = 900 * time.Millisecond

type vstreamer struct {
ctx context.Context
cancel func()
Expand Down Expand Up @@ -132,9 +138,8 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
case binlogdatapb.VEventType_GTID, binlogdatapb.VEventType_BEGIN, binlogdatapb.VEventType_FIELD:
// We never have to send GTID, BEGIN or FIELD events on their own.
bufferedEvents = append(bufferedEvents, vevent)
case binlogdatapb.VEventType_COMMIT, binlogdatapb.VEventType_DDL:
// COMMIT and DDL are terminal. There may be no more events after
// these for a long time. So, we have to send whatever we have.
case binlogdatapb.VEventType_COMMIT, binlogdatapb.VEventType_DDL, binlogdatapb.VEventType_HEARTBEAT:
// COMMIT, DDL and HEARTBEAT must be immediately sent.
bufferedEvents = append(bufferedEvents, vevent)
vevents := bufferedEvents
bufferedEvents = nil
Expand Down Expand Up @@ -167,7 +172,16 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
}

// Main loop: calls bufferAndTransmit as events arrive.
timer := time.NewTimer(heartbeatTime)
defer timer.Stop()
for {
timer.Reset(heartbeatTime)
// Drain event if timer fired before reset.
select {
case <-timer.C:
default:
}

select {
case ev, ok := <-events:
if !ok {
Expand Down Expand Up @@ -196,6 +210,18 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
}
case <-ctx.Done():
return nil
case <-timer.C:
now := time.Now().UnixNano()
if err := bufferAndTransmit(&binlogdatapb.VEvent{
Type: binlogdatapb.VEventType_HEARTBEAT,
Timestamp: now / 1e9,
CurrentTime: now,
}); err != nil {
if err == io.EOF {
return nil
}
return fmt.Errorf("error sending event: %v", err)
}
}
}
}
Expand Down Expand Up @@ -392,6 +418,7 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e
}
for _, vevent := range vevents {
vevent.Timestamp = int64(ev.Timestamp())
vevent.CurrentTime = time.Now().UnixNano()
}
return vevents, nil
}
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -919,6 +919,8 @@ func expectLog(ctx context.Context, t *testing.T, input interface{}, ch <-chan [
t.Fatalf("%v: evs\n%v, want\n%v", input, evs, wantset)
}
for i, want := range wantset {
// CurrentTime is not testable.
evs[i].CurrentTime = 0
switch want {
case "gtid|begin":
if evs[i].Type != binlogdatapb.VEventType_GTID && evs[i].Type != binlogdatapb.VEventType_BEGIN {
Expand Down
3 changes: 3 additions & 0 deletions proto/binlogdata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ enum VEventType {
OTHER = 11;
ROW = 12;
FIELD = 13;
HEARTBEAT = 14;
}

// RowChange represents one row change
Expand Down Expand Up @@ -212,6 +213,8 @@ message VEvent {
string ddl = 4;
RowEvent row_event = 5;
FieldEvent field_event = 6;
// current_time specifies the current time to handle clock skew.
int64 current_time = 20;
}

// VStreamRequest is the payload for VStream
Expand Down
32 changes: 22 additions & 10 deletions py/vtproto/binlogdata_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.