Skip to content

Commit

Permalink
Vrepl Savepoint: handle savepoint in binlog, treated as no-ops
Browse files Browse the repository at this point in the history
Signed-off-by: Rohit Nayak <rohit@planetscale.com>
  • Loading branch information
rohit-nayak-ps committed Jul 16, 2020
1 parent 6032234 commit 50a5009
Show file tree
Hide file tree
Showing 7 changed files with 205 additions and 130 deletions.
244 changes: 124 additions & 120 deletions go/vt/proto/binlogdata/binlogdata.pb.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions go/vt/sqlparser/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ func Preview(sql string) StatementType {
return StmtUpdate
case "delete":
return StmtDelete
case "savepoint":
return StmtSavepoint
}
// For the following statements it is not sufficient to rely
// on loweredFirstWord. This is because they are not statements
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,8 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m
return io.EOF
case binlogdatapb.VEventType_HEARTBEAT:
// No-op: heartbeat timings are calculated in outer loop.
case binlogdatapb.VEventType_SAVEPOINT:
// No-op: savepoints are not applied
}
return nil
}
54 changes: 54 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,60 @@ import (
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)

func TestPlayerSavepoint(t *testing.T) {
defer deleteTablet(addTablet(100))
execStatements(t, []string{
"create table t1(id int, primary key(id))",
fmt.Sprintf("create table %s.t1(id int, primary key(id))", vrepldb),
})
defer execStatements(t, []string{
"drop table t1",
fmt.Sprintf("drop table %s.t1", vrepldb),
})
env.SchemaEngine.Reload(context.Background())

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "/.*",
}},
}
bls := &binlogdatapb.BinlogSource{
Keyspace: env.KeyspaceName,
Shard: env.ShardName,
Filter: filter,
OnDdl: binlogdatapb.OnDDLAction_IGNORE,
}
cancel, _ := startVReplication(t, bls, "")
// Issue a dummy change to ensure vreplication is initialized. Otherwise there
// is a race between the DDLs and the schema loader of vstreamer.
// Root cause seems to be with MySQL where t1 shows up in information_schema before
// the actual table is created.
execStatements(t, []string{"insert into t1 values(1)"})
expectDBClientQueries(t, []string{
"begin",
"insert into t1(id) values (1)",
"/update _vt.vreplication set pos=",
"commit",
})

execStatements(t, []string{
"begin",
"insert into t1(id) values (2)",
"savepoint a",
"insert into t1(id) values (3)",
"release savepoint a",
"commit",
})
expectDBClientQueries(t, []string{
"begin",
"/insert into t1.*2.*",
"/insert into t1.*3.*",
"/update _vt.vreplication set pos=",
"commit",
})
cancel()
}

func TestPlayerStatementModeWithFilter(t *testing.T) {
defer deleteTablet(addTablet(100))

Expand Down
24 changes: 14 additions & 10 deletions go/vt/vttablet/tabletserver/vstreamer/uvstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,16 +228,23 @@ func TestVStreamCopyCompleteFlow(t *testing.T) {
log.Info("Copy done, inserting events to stream")
insertRow(t, "t1", 1, numInitialRows+4)
insertRow(t, "t2", 2, numInitialRows+3)
insertRow(t, "t3", 3, numInitialRows+2)
insertRow(t, "t3", 3, numInitialRows+3)
// savepoints should not be sent in the event stream
execStatement(t, `
begin;
insert into t3 (id31, id32) values (12, 360);
savepoint a;
insert into t3 (id31, id32) values (13, 390);
release savepoint a;
commit;"
`)
}

numCopyEvents := 3 /*t1,t2,t3*/ * (numInitialRows + 1 /*FieldEvent*/ + 1 /*LastPKEvent*/ + 1 /*TestEvent: Copy Start*/ + 2 /*begin,commit*/ + 3 /* LastPK Completed*/)
numCopyEvents += 2 /* GTID + Test event after all copy is done */
numCatchupEvents := 3 * 5 /*2 t1, 1 t2 : BEGIN+FIELD+ROW+GTID+COMMIT*/
numFastForwardEvents := 5 /*t1:FIELD+ROW*/
numMisc := 1 /* t2 insert during t1 catchup that comes in t2 copy */
numReplicateEvents := 3*5 /* insert into t1/t2/t3 */ + 4 /* second insert into t3, no FieldEvent */
numCopyEvents += 2 /* GTID + Test event after all copy is done */
numCatchupEvents := 3 * 5 /*2 t1, 1 t2 : BEGIN+FIELD+ROW+GTID+COMMIT*/
numFastForwardEvents := 5 /*t1:FIELD+ROW*/
numMisc := 1 /* t2 insert during t1 catchup that comes in t2 copy */
numReplicateEvents := 2*5 /* insert into t1/t2 */ + 6 /* begin/field/2 inserts/gtid/commit, savepoint ignored */
numExpectedEvents := numCopyEvents + numCatchupEvents + numFastForwardEvents + numMisc + numReplicateEvents

var lastRowEventSeen bool
Expand Down Expand Up @@ -501,9 +508,6 @@ var expectedEvents = []string{
"type:BEGIN",
"type:FIELD field_event:<table_name:\"t3\" fields:<name:\"id31\" type:INT32 > fields:<name:\"id32\" type:INT32 > > ",
"type:ROW row_event:<table_name:\"t3\" row_changes:<after:<lengths:2 lengths:3 values:\"12360\" > > > ",
"type:GTID",
"type:COMMIT",
"type:BEGIN",
"type:ROW row_event:<table_name:\"t3\" row_changes:<after:<lengths:2 lengths:3 values:\"13390\" > > > ",
"type:GTID",
"type:COMMIT",
Expand Down
8 changes: 8 additions & 0 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,8 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
}
curSize += newSize
bufferedEvents = append(bufferedEvents, vevent)
case binlogdatapb.VEventType_SAVEPOINT:
// no-op, we do not send savepoint event on
default:
return fmt.Errorf("unexpected event: %v", vevent)
}
Expand Down Expand Up @@ -308,6 +310,7 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog

// parseEvent parses an event from the binlog and converts it to a list of VEvents.
func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, error) {

if !ev.IsValid() {
return nil, fmt.Errorf("can't parse binlog event: invalid data: %#v", ev)
}
Expand Down Expand Up @@ -437,6 +440,11 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e
Type: binlogdatapb.VEventType_OTHER,
})
}
vs.se.ReloadAt(context.Background(), vs.pos)
case sqlparser.StmtSavepoint:
vevents = append(vevents, &binlogdatapb.VEvent{
Type: binlogdatapb.VEventType_SAVEPOINT,
})
case sqlparser.StmtOther, sqlparser.StmtPriv:
// These are either:
// 1) DBA statements like REPAIR that can be ignored.
Expand Down
1 change: 1 addition & 0 deletions proto/binlogdata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ enum VEventType {
JOURNAL = 16;
VERSION = 17;
LASTPK = 18;
SAVEPOINT = 19;
}

// RowChange represents one row change.
Expand Down

0 comments on commit 50a5009

Please sign in to comment.