Skip to content

Commit

Permalink
Vrepl Savepoint: change vevent proto: ddl field name to statement, us…
Browse files Browse the repository at this point in the history
…e statement for dml if present, dml field is deprecated, will be removed in 8.0. set statement for savepoint and apply in vrepl/send in vstream client

Signed-off-by: Rohit Nayak <rohit@planetscale.com>
  • Loading branch information
rohit-nayak-ps committed Jul 16, 2020
1 parent 50a5009 commit 250caba
Show file tree
Hide file tree
Showing 10 changed files with 229 additions and 172 deletions.
4 changes: 2 additions & 2 deletions go/cmd/vtgateclienttest/services/echo.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (c *echoClient) VStream(ctx context.Context, tabletType topodatapb.TabletTy
Type: 1,
Timestamp: 1234,
Gtid: "echo-gtid-1",
Ddl: "echo-ddl-1",
Statement: "echo-ddl-1",
Vgtid: vgtid,
RowEvent: &binlogdatapb.RowEvent{
TableName: "echo-table-1",
Expand All @@ -160,7 +160,7 @@ func (c *echoClient) VStream(ctx context.Context, tabletType topodatapb.TabletTy
Type: 2,
Timestamp: 4321,
Gtid: "echo-gtid-2",
Ddl: "echo-ddl-2",
Statement: "echo-ddl-2",
Vgtid: vgtid,
FieldEvent: &binlogdatapb.FieldEvent{
TableName: "echo-table-2",
Expand Down
251 changes: 127 additions & 124 deletions go/vt/proto/binlogdata/binlogdata.pb.go

Large diffs are not rendered by default.

24 changes: 12 additions & 12 deletions go/vt/vttablet/endtoend/vstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestSchemaVersioning(t *testing.T) {
`gtid`, //gtid+other => vstream current pos
`other`,
`gtid`, //gtid+ddl => actual query
`type:DDL ddl:"create table vitess_version (id1 int, id2 int)" `},
`type:DDL statement:"create table vitess_version (id1 int, id2 int)" `},
getSchemaVersionTableCreationEvents()...),
`version`,
`gtid`,
Expand All @@ -102,7 +102,7 @@ func TestSchemaVersioning(t *testing.T) {
query: "alter table vitess_version add column id3 int",
output: []string{
`gtid`,
`type:DDL ddl:"alter table vitess_version add column id3 int" `,
`type:DDL statement:"alter table vitess_version add column id3 int" `,
`version`,
`gtid`,
},
Expand All @@ -117,7 +117,7 @@ func TestSchemaVersioning(t *testing.T) {
query: "alter table vitess_version modify column id3 varbinary(16)",
output: []string{
`gtid`,
`type:DDL ddl:"alter table vitess_version modify column id3 varbinary(16)" `,
`type:DDL statement:"alter table vitess_version modify column id3 varbinary(16)" `,
`version`,
`gtid`,
},
Expand Down Expand Up @@ -170,7 +170,7 @@ func TestSchemaVersioning(t *testing.T) {
query: "/**/alter table vitess_version add column id4 varbinary(16)",
output: []string{
`gtid`, //no tracker, so no insert into schema_version or version event
`type:DDL ddl:"alter table vitess_version add column id4 varbinary(16)" `,
`type:DDL statement:"alter table vitess_version add column id4 varbinary(16)" `,
},
}, {
query: "insert into vitess_version values(4, 40, 'FFF', 'GGGG' )",
Expand Down Expand Up @@ -215,29 +215,29 @@ func TestSchemaVersioning(t *testing.T) {
// playing events from the past: same events as original since historian is providing the latest schema
output := append(append([]string{
`gtid`,
`type:DDL ddl:"create table vitess_version (id1 int, id2 int)" `},
`type:DDL statement:"create table vitess_version (id1 int, id2 int)" `},
getSchemaVersionTableCreationEvents()...),
`version`,
`gtid`,
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 > fields:<name:"id2" type:INT32 > > `,
`type:ROW row_event:<table_name:"vitess_version" row_changes:<after:<lengths:1 lengths:2 values:"110" > > > `,
`gtid`,
`gtid`,
`type:DDL ddl:"alter table vitess_version add column id3 int" `,
`type:DDL statement:"alter table vitess_version add column id3 int" `,
`version`,
`gtid`,
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 > fields:<name:"id2" type:INT32 > fields:<name:"id3" type:INT32 > > `,
`type:ROW row_event:<table_name:"vitess_version" row_changes:<after:<lengths:1 lengths:2 lengths:3 values:"220200" > > > `,
`gtid`,
`gtid`,
`type:DDL ddl:"alter table vitess_version modify column id3 varbinary(16)" `,
`type:DDL statement:"alter table vitess_version modify column id3 varbinary(16)" `,
`version`,
`gtid`,
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 > fields:<name:"id2" type:INT32 > fields:<name:"id3" type:VARBINARY > > `,
`type:ROW row_event:<table_name:"vitess_version" row_changes:<after:<lengths:1 lengths:2 lengths:3 values:"330TTT" > > > `,
`gtid`,
`gtid`,
`type:DDL ddl:"alter table vitess_version add column id4 varbinary(16)" `,
`type:DDL statement:"alter table vitess_version add column id4 varbinary(16)" `,
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 > fields:<name:"id2" type:INT32 > fields:<name:"id3" type:VARBINARY > fields:<name:"id4" type:VARBINARY > > `,
`type:ROW row_event:<table_name:"vitess_version" row_changes:<after:<lengths:1 lengths:2 lengths:3 lengths:4 values:"440FFFGGGG" > > > `,
`gtid`,
Expand Down Expand Up @@ -279,23 +279,23 @@ func TestSchemaVersioning(t *testing.T) {
// playing events from the past: same as earlier except one below, see comments
output = append(append([]string{
`gtid`,
`type:DDL ddl:"create table vitess_version (id1 int, id2 int)" `},
`type:DDL statement:"create table vitess_version (id1 int, id2 int)" `},
getSchemaVersionTableCreationEvents()...),
`version`,
`gtid`,
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 > fields:<name:"id2" type:INT32 > > `,
`type:ROW row_event:<table_name:"vitess_version" row_changes:<after:<lengths:1 lengths:2 values:"110" > > > `,
`gtid`,
`gtid`,
`type:DDL ddl:"alter table vitess_version add column id3 int" `,
`type:DDL statement:"alter table vitess_version add column id3 int" `,
`version`,
`gtid`,
/*at this point we only have latest schema so we have types (int32, int32, varbinary, varbinary) so the types don't match. Hence the @ fieldnames*/
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"@1" type:INT32 > fields:<name:"@2" type:INT32 > fields:<name:"@3" type:INT32 > > `,
`type:ROW row_event:<table_name:"vitess_version" row_changes:<after:<lengths:1 lengths:2 lengths:3 values:"220200" > > > `,
`gtid`,
`gtid`,
`type:DDL ddl:"alter table vitess_version modify column id3 varbinary(16)" `,
`type:DDL statement:"alter table vitess_version modify column id3 varbinary(16)" `,
`version`,
`gtid`,
/*at this point we only have latest schema so we have types (int32, int32, varbinary, varbinary),
Expand All @@ -304,7 +304,7 @@ func TestSchemaVersioning(t *testing.T) {
`type:ROW row_event:<table_name:"vitess_version" row_changes:<after:<lengths:1 lengths:2 lengths:3 values:"330TTT" > > > `,
`gtid`,
`gtid`,
`type:DDL ddl:"alter table vitess_version add column id4 varbinary(16)" `,
`type:DDL statement:"alter table vitess_version add column id4 varbinary(16)" `,
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 > fields:<name:"id2" type:INT32 > fields:<name:"id3" type:VARBINARY > fields:<name:"id4" type:VARBINARY > > `,
`type:ROW row_event:<table_name:"vitess_version" row_changes:<after:<lengths:1 lengths:2 lengths:3 lengths:4 values:"440FFFGGGG" > > > `,
`gtid`,
Expand Down
29 changes: 19 additions & 10 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,12 @@ func (vp *vplayer) fetchAndApply(ctx context.Context) (err error) {
}

func (vp *vplayer) applyStmtEvent(ctx context.Context, event *binlogdatapb.VEvent) error {
if vp.canAcceptStmtEvents {
_, err := vp.vr.dbClient.ExecuteWithRetry(ctx, event.Dml)
sql := event.Statement
if sql == "" {
sql = event.Dml
}
if event.Type == binlogdatapb.VEventType_SAVEPOINT || vp.canAcceptStmtEvents {
_, err := vp.vr.dbClient.ExecuteWithRetry(ctx, sql)
return err
}
return fmt.Errorf("filter rules are not supported for SBR replication: %v", vp.vr.source.Filter.GetRules())
Expand Down Expand Up @@ -419,8 +423,13 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m
stats.Send(fmt.Sprintf("%v", event.FieldEvent))

case binlogdatapb.VEventType_INSERT, binlogdatapb.VEventType_DELETE, binlogdatapb.VEventType_UPDATE, binlogdatapb.VEventType_REPLACE:
// use Statement if available, preparing for deprecation in 8.0
sql := event.Statement
if sql == "" {
sql = event.Dml
}
// If the event is for one of the AWS RDS "special" tables, we skip
if !strings.Contains(event.Dml, " mysql.rds_") {
if !strings.Contains(sql, " mysql.rds_") {
// This is a player using stament based replication
if err := vp.vr.dbClient.Begin(); err != nil {
return err
Expand All @@ -429,7 +438,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m
if err := vp.applyStmtEvent(ctx, event); err != nil {
return err
}
stats.Send(fmt.Sprintf(event.Dml))
stats.Send(sql)
}
case binlogdatapb.VEventType_ROW:
// This player is configured for row based replication
Expand Down Expand Up @@ -478,7 +487,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m
if _, err := vp.updatePos(event.Timestamp); err != nil {
return err
}
if err := vp.vr.setState(binlogplayer.BlpStopped, fmt.Sprintf("Stopped at DDL %s", event.Ddl)); err != nil {
if err := vp.vr.setState(binlogplayer.BlpStopped, fmt.Sprintf("Stopped at DDL %s", event.Statement)); err != nil {
return err
}
if err := vp.vr.dbClient.Commit(); err != nil {
Expand All @@ -490,10 +499,10 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m
// So, we apply the DDL first, and then save the position.
// Manual intervention may be needed if there is a partial
// failure here.
if _, err := vp.vr.dbClient.ExecuteWithRetry(ctx, event.Ddl); err != nil {
if _, err := vp.vr.dbClient.ExecuteWithRetry(ctx, event.Statement); err != nil {
return err
}
stats.Send(fmt.Sprintf("%v", event.Ddl))
stats.Send(fmt.Sprintf("%v", event.Statement))
posReached, err := vp.updatePos(event.Timestamp)
if err != nil {
return err
Expand All @@ -502,10 +511,10 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m
return io.EOF
}
case binlogdatapb.OnDDLAction_EXEC_IGNORE:
if _, err := vp.vr.dbClient.ExecuteWithRetry(ctx, event.Ddl); err != nil {
log.Infof("Ignoring error: %v for DDL: %s", err, event.Ddl)
if _, err := vp.vr.dbClient.ExecuteWithRetry(ctx, event.Statement); err != nil {
log.Infof("Ignoring error: %v for DDL: %s", err, event.Statement)
}
stats.Send(fmt.Sprintf("%v", event.Ddl))
stats.Send(fmt.Sprintf("%v", event.Statement))
posReached, err := vp.updatePos(event.Timestamp)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (tr *Tracker) process(ctx context.Context) {
gtid = event.Gtid
}
if event.Type == binlogdatapb.VEventType_DDL {
if err := tr.schemaUpdated(gtid, event.Ddl, event.Timestamp); err != nil {
if err := tr.schemaUpdated(gtid, event.Statement, event.Timestamp); err != nil {
tr.env.Stats().ErrorCounters.Add(vtrpcpb.Code_INTERNAL.String(), 1)
log.Errorf("Error updating schema: %s", sqlparser.TruncateForLog(err.Error()))
}
Expand Down
16 changes: 8 additions & 8 deletions go/vt/vttablet/tabletserver/schema/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,22 +55,22 @@ func TestTracker(t *testing.T) {
Type: binlogdatapb.VEventType_GTID,
Gtid: gtid1,
}, {
Type: binlogdatapb.VEventType_DDL,
Ddl: ddl1,
Type: binlogdatapb.VEventType_DDL,
Statement: ddl1,
},
{
Type: binlogdatapb.VEventType_GTID,
Gtid: "",
Type: binlogdatapb.VEventType_GTID,
Statement: "",
}, {
Type: binlogdatapb.VEventType_DDL,
Ddl: ddl1,
Type: binlogdatapb.VEventType_DDL,
Statement: ddl1,
},
{
Type: binlogdatapb.VEventType_GTID,
Gtid: gtid1,
}, {
Type: binlogdatapb.VEventType_DDL,
Ddl: "",
Type: binlogdatapb.VEventType_DDL,
Statement: "",
},
}},
}
Expand Down
9 changes: 7 additions & 2 deletions go/vt/vttablet/tabletserver/vstreamer/uvstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,10 @@ begin;
insert into t3 (id31, id32) values (12, 360);
savepoint a;
insert into t3 (id31, id32) values (13, 390);
release savepoint a;
rollback work to savepoint a;
savepoint b;
insert into t3 (id31, id32) values (13, 390);
release savepoint b;
commit;"
`)
}
Expand All @@ -244,7 +247,7 @@ commit;"
numCatchupEvents := 3 * 5 /*2 t1, 1 t2 : BEGIN+FIELD+ROW+GTID+COMMIT*/
numFastForwardEvents := 5 /*t1:FIELD+ROW*/
numMisc := 1 /* t2 insert during t1 catchup that comes in t2 copy */
numReplicateEvents := 2*5 /* insert into t1/t2 */ + 6 /* begin/field/2 inserts/gtid/commit, savepoint ignored */
numReplicateEvents := 2*5 /* insert into t1/t2 */ + 8 /* begin/field/2 inserts/gtid/commit + 2 savepoints */
numExpectedEvents := numCopyEvents + numCatchupEvents + numFastForwardEvents + numMisc + numReplicateEvents

var lastRowEventSeen bool
Expand Down Expand Up @@ -508,6 +511,8 @@ var expectedEvents = []string{
"type:BEGIN",
"type:FIELD field_event:<table_name:\"t3\" fields:<name:\"id31\" type:INT32 > fields:<name:\"id32\" type:INT32 > > ",
"type:ROW row_event:<table_name:\"t3\" row_changes:<after:<lengths:2 lengths:3 values:\"12360\" > > > ",
"type:SAVEPOINT statement:\"SAVEPOINT `a`\"",
"type:SAVEPOINT statement:\"SAVEPOINT `b`\"",
"type:ROW row_event:<table_name:\"t3\" row_changes:<after:<lengths:2 lengths:3 values:\"13390\" > > > ",
"type:GTID",
"type:COMMIT",
Expand Down
9 changes: 5 additions & 4 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
curSize += newSize
bufferedEvents = append(bufferedEvents, vevent)
case binlogdatapb.VEventType_SAVEPOINT:
// no-op, we do not send savepoint event on
bufferedEvents = append(bufferedEvents, vevent)
default:
return fmt.Errorf("unexpected event: %v", vevent)
}
Expand Down Expand Up @@ -424,8 +424,8 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e
Type: binlogdatapb.VEventType_GTID,
Gtid: mysql.EncodePosition(vs.pos),
}, &binlogdatapb.VEvent{
Type: binlogdatapb.VEventType_DDL,
Ddl: q.SQL,
Type: binlogdatapb.VEventType_DDL,
Statement: q.SQL,
})
// Reload schema only if the DDL change is relevant.
// TODO(sougou): move this back to always load after
Expand All @@ -443,7 +443,8 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e
vs.se.ReloadAt(context.Background(), vs.pos)
case sqlparser.StmtSavepoint:
vevents = append(vevents, &binlogdatapb.VEvent{
Type: binlogdatapb.VEventType_SAVEPOINT,
Type: binlogdatapb.VEventType_SAVEPOINT,
Statement: q.SQL,
})
case sqlparser.StmtOther, sqlparser.StmtPriv:
// These are either:
Expand Down
Loading

0 comments on commit 250caba

Please sign in to comment.