diff --git a/go/test/endtoend/vtgate/foreignkey/fk_test.go b/go/test/endtoend/vtgate/foreignkey/fk_test.go index 3f59a247273..d50e66cbd13 100644 --- a/go/test/endtoend/vtgate/foreignkey/fk_test.go +++ b/go/test/endtoend/vtgate/foreignkey/fk_test.go @@ -17,12 +17,19 @@ limitations under the License. package foreignkey import ( + "context" + "io" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/utils" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/vtgate/vtgateconn" ) // TestInsertWithFK tests that insertions work as expected when foreign key management is enabled in Vitess. @@ -101,7 +108,7 @@ func TestDeleteWithFK(t *testing.T) { utils.AssertMatches(t, conn, `select * from u_t2`, `[[INT64(342) NULL] [INT64(19) INT64(1234)]]`) } -// TestUpdations tests that update work as expected when foreign key management is enabled in Vitess. +// TestUpdateWithFK tests that update work as expected when foreign key management is enabled in Vitess. func TestUpdateWithFK(t *testing.T) { mcmp, closer := start(t) conn := mcmp.VtConn @@ -162,6 +169,125 @@ func TestUpdateWithFK(t *testing.T) { utils.AssertMatches(t, conn, `select * from u_t3 order by id`, `[[INT64(1) INT64(12)] [INT64(32) INT64(13)]]`) } +// TestVstreamForFKBinLog tests that dml queries with fks are written with child row first approach in the binary logs. +func TestVstreamForFKBinLog(t *testing.T) { + vtgateConn, err := cluster.DialVTGate(context.Background(), t.Name(), vtgateGrpcAddress, "fk_user", "") + require.NoError(t, err) + defer vtgateConn.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ch := make(chan *binlogdatapb.VEvent) + runVStream(t, ctx, ch, vtgateConn) + + mcmp, closer := start(t) + conn := mcmp.VtConn + defer closer() + defer cancel() + + utils.Exec(t, conn, `use uks`) + + // insert some data. + utils.Exec(t, conn, `insert into u_t1(id, col1) values (1,2), (11,4), (111,6)`) + utils.Exec(t, conn, `insert into u_t2(id, col2) values (2,2), (22,4)`) + utils.Exec(t, conn, `insert into u_t3(id, col3) values (33,4), (333,6)`) + // drain 3 row events. + _ = drainEvents(t, ch, 3) + + tcases := []struct { + query string + events int + rowEvents []string + }{{ + query: `update u_t1 set col1 = 3 where id = 11`, + events: 3, + rowEvents: []string{ + `table_name:"uks.u_t3" row_changes:{before:{lengths:2 lengths:1 values:"334"} after:{lengths:2 lengths:1 values:"333"}} keyspace:"uks" shard:"0" flags:3`, + `table_name:"uks.u_t2" row_changes:{before:{lengths:2 lengths:1 values:"224"} after:{lengths:2 lengths:-1 values:"22"}} keyspace:"uks" shard:"0" flags:1`, + `table_name:"uks.u_t1" row_changes:{before:{lengths:2 lengths:1 values:"114"} after:{lengths:2 lengths:1 values:"113"}} keyspace:"uks" shard:"0" flags:1`, + }, + }, { + query: `update u_t1 set col1 = 5 where id = 11`, + events: 2, + rowEvents: []string{ + `table_name:"uks.u_t3" row_changes:{before:{lengths:2 lengths:1 values:"333"} after:{lengths:2 lengths:1 values:"335"}} keyspace:"uks" shard:"0" flags:3`, + `table_name:"uks.u_t1" row_changes:{before:{lengths:2 lengths:1 values:"113"} after:{lengths:2 lengths:1 values:"115"}} keyspace:"uks" shard:"0" flags:1`, + }, + }, { + query: `delete from u_t1 where col1 = 6`, + events: 2, + rowEvents: []string{ + `table_name:"uks.u_t3" row_changes:{before:{lengths:3 lengths:1 values:"3336"}} keyspace:"uks" shard:"0" flags:1`, + `table_name:"uks.u_t1" row_changes:{before:{lengths:3 lengths:1 values:"1116"}} keyspace:"uks" shard:"0" flags:1`, + }, + }, { + query: `update u_t1 set col1 = null where id = 11`, + events: 2, + rowEvents: []string{ + `table_name:"uks.u_t3" row_changes:{before:{lengths:2 lengths:1 values:"335"} after:{lengths:2 lengths:-1 values:"33"}} keyspace:"uks" shard:"0" flags:3`, + `table_name:"uks.u_t1" row_changes:{before:{lengths:2 lengths:1 values:"115"} after:{lengths:2 lengths:-1 values:"11"}} keyspace:"uks" shard:"0" flags:1`, + }, + }, { + query: `delete from u_t1 where id = 11`, + events: 1, + rowEvents: []string{ + `table_name:"uks.u_t1" row_changes:{before:{lengths:2 lengths:-1 values:"11"}} keyspace:"uks" shard:"0" flags:1`, + }, + }} + for _, tcase := range tcases { + t.Run(tcase.query, func(t *testing.T) { + utils.Exec(t, conn, tcase.query) + // drain row events. + rowEvents := drainEvents(t, ch, tcase.events) + assert.ElementsMatch(t, tcase.rowEvents, rowEvents) + }) + } +} + +func runVStream(t *testing.T, ctx context.Context, ch chan *binlogdatapb.VEvent, vtgateConn *vtgateconn.VTGateConn) { + vgtid := &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{ + {Keyspace: unshardedKs, Shard: "0", Gtid: "current"}, + }} + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "/u.*", + }}, + } + vReader, err := vtgateConn.VStream(ctx, topodatapb.TabletType_REPLICA, vgtid, filter, nil) + require.NoError(t, err) + + go func() { + for { + evs, err := vReader.Recv() + if err == io.EOF || ctx.Err() != nil { + return + } + require.NoError(t, err) + + for _, ev := range evs { + if ev.Type == binlogdatapb.VEventType_ROW { + ch <- ev + } + } + } + }() +} + +func drainEvents(t *testing.T, ch chan *binlogdatapb.VEvent, count int) []string { + var rowEvents []string + for i := 0; i < count; i++ { + select { + case re := <-ch: + rowEvents = append(rowEvents, re.RowEvent.String()) + case <-time.After(10 * time.Second): + t.Fatalf("timeout waiting for event number: %d", i+1) + } + } + return rowEvents +} + // TestFkScenarios tests the various foreign key scenarios with different constraints // and makes sure that Vitess works with them as expected. All the tables are present in both sharded and unsharded keyspace // and all the foreign key constraints are cross-shard ones for the sharded keyspace. diff --git a/go/test/endtoend/vtgate/foreignkey/main_test.go b/go/test/endtoend/vtgate/foreignkey/main_test.go index cf0c76b5404..fc42c56f311 100644 --- a/go/test/endtoend/vtgate/foreignkey/main_test.go +++ b/go/test/endtoend/vtgate/foreignkey/main_test.go @@ -25,19 +25,19 @@ import ( "github.com/stretchr/testify/require" - "vitess.io/vitess/go/test/endtoend/utils" - "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/test/endtoend/utils" ) var ( - clusterInstance *cluster.LocalProcessCluster - vtParams mysql.ConnParams - mysqlParams mysql.ConnParams - shardedKs = "ks" - unshardedKs = "uks" - Cell = "test" + clusterInstance *cluster.LocalProcessCluster + vtParams mysql.ConnParams + mysqlParams mysql.ConnParams + vtgateGrpcAddress string + shardedKs = "ks" + unshardedKs = "uks" + Cell = "test" //go:embed sharded_schema.sql shardedSchemaSQL string @@ -82,7 +82,7 @@ func TestMain(m *testing.M) { SchemaSQL: unshardedSchemaSQL, VSchema: unshardedVSchema, } - err = clusterInstance.StartUnshardedKeyspace(*uKs, 0, false) + err = clusterInstance.StartUnshardedKeyspace(*uKs, 1, false) if err != nil { return 1 } @@ -101,6 +101,7 @@ func TestMain(m *testing.M) { Host: clusterInstance.Hostname, Port: clusterInstance.VtgateMySQLPort, } + vtgateGrpcAddress = fmt.Sprintf("%s:%d", clusterInstance.Hostname, clusterInstance.VtgateGrpcPort) connParams, closer, err := utils.NewMySQL(clusterInstance, shardedKs, shardedSchemaSQL) if err != nil {