Skip to content

Commit

Permalink
Merge pull request #6772 from planetscale/rn-vr-set-stmt
Browse files Browse the repository at this point in the history
VStreamer: handle SET statements in the binlog stream
  • Loading branch information
sougou authored Sep 24, 2020
2 parents 9219d12 + 99c1bd9 commit c02b7d5
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 3 deletions.
1 change: 0 additions & 1 deletion go/vt/vttablet/tabletserver/vstreamer/testenv/testenv.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ func Init() (*Env, error) {
os.RemoveAll(te.cluster.Config.SchemaDir)
return nil, fmt.Errorf("could not launch mysql: %v", err)
}

te.Dbcfgs = dbconfigs.NewTestDBConfigs(te.cluster.MySQLConnParams(), te.cluster.MySQLAppDebugConnParams(), te.cluster.DbName())
config := tabletenv.NewDefaultConfig()
config.DB = te.Dbcfgs
Expand Down
5 changes: 3 additions & 2 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,6 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog

// parseEvent parses an event from the binlog and converts it to a list of VEvents.
func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, error) {

if !ev.IsValid() {
return nil, fmt.Errorf("can't parse binlog event: invalid data: %#v", ev)
}
Expand Down Expand Up @@ -383,6 +382,7 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e
}
// Insert/Delete/Update are supported only to be used in the context of external mysql streams where source databases
// could be using SBR. Vitess itself will never run into cases where it needs to consume non rbr statements.

switch cat := sqlparser.Preview(q.SQL); cat {
case sqlparser.StmtInsert:
mustSend := mustSendStmt(q, params.DbName)
Expand Down Expand Up @@ -455,7 +455,7 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e
Statement: q.SQL,
})
}
case sqlparser.StmtOther, sqlparser.StmtPriv:
case sqlparser.StmtOther, sqlparser.StmtPriv, sqlparser.StmtSet:
// These are either:
// 1) DBA statements like REPAIR that can be ignored.
// 2) Privilege-altering statements like GRANT/REVOKE
Expand Down Expand Up @@ -538,6 +538,7 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e
if err != nil {
return nil, err
}

}
for _, vevent := range vevents {
vevent.Timestamp = int64(ev.Timestamp())
Expand Down
59 changes: 59 additions & 0 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,57 @@ type testcase struct {
output [][]string
}

func checkIfOptionIsSupported(t *testing.T, variable string) bool {
qr, err := env.Mysqld.FetchSuperQuery(context.Background(), fmt.Sprintf("show variables like '%s'", variable))
require.NoError(t, err)
require.NotNil(t, qr)
if qr.Rows != nil && len(qr.Rows) == 1 {
return true
}
return false
}

func TestSetStatement(t *testing.T) {

if testing.Short() {
t.Skip()
}
if !checkIfOptionIsSupported(t, "log_builtin_as_identified_by_password") {
// the combination of setting this option and support for "set password" only works on a few flavors
log.Info("Cannot test SetStatement on this flavor")
return
}

execStatements(t, []string{
"create table t1(id int, val varbinary(128), primary key(id))",
})
defer execStatements(t, []string{
"drop table t1",
})
engine.se.Reload(context.Background())
queries := []string{
"begin",
"insert into t1 values (1, 'aaa')",
"commit",
"set global log_builtin_as_identified_by_password=1",
"SET PASSWORD FOR 'vt_appdebug'@'localhost'='*AA17DA66C7C714557F5485E84BCAFF2C209F2F53'", //select password('vtappdebug_password');
}
testcases := []testcase{{
input: queries,
output: [][]string{{
`begin`,
`type:FIELD field_event:<table_name:"t1" fields:<name:"id" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id" column_length:11 charset:63 > fields:<name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 > > `,
`type:ROW row_event:<table_name:"t1" row_changes:<after:<lengths:1 lengths:3 values:"1aaa" > > > `,
`gtid`,
`commit`,
}, {
`gtid`,
`other`,
}},
}}
runCases(t, nil, testcases, "current", nil)
}

func TestVersion(t *testing.T) {
if testing.Short() {
t.Skip()
Expand Down Expand Up @@ -1706,6 +1757,14 @@ func expectLog(ctx context.Context, t *testing.T, input interface{}, ch <-chan [
if evs[i].Type != binlogdatapb.VEventType_COMMIT {
t.Fatalf("%v (%d): event: %v, want commit", input, i, evs[i])
}
case "other":
if evs[i].Type != binlogdatapb.VEventType_OTHER {
t.Fatalf("%v (%d): event: %v, want other", input, i, evs[i])
}
case "ddl":
if evs[i].Type != binlogdatapb.VEventType_DDL {
t.Fatalf("%v (%d): event: %v, want ddl", input, i, evs[i])
}
default:
evs[i].Timestamp = 0
if evs[i].Type == binlogdatapb.VEventType_FIELD {
Expand Down

0 comments on commit c02b7d5

Please sign in to comment.