diff --git a/drainer/translator/kafka.go b/drainer/translator/kafka.go index ec083ed04..1d39dd63d 100644 --- a/drainer/translator/kafka.go +++ b/drainer/translator/kafka.go @@ -52,6 +52,7 @@ func TiBinlogToSlaveBinlog( } return slaveBinlog, nil } + slaveBinlog := &obinlog.Binlog{ Type: obinlog.BinlogType_DML, CommitTs: tiBinlog.GetCommitTs(), @@ -71,15 +72,18 @@ func TiBinlogToSlaveBinlog( } iter := newSequenceIterator(&mut) + table := genTable(schema, info) + slaveBinlog.DmlData.Tables = append(slaveBinlog.DmlData.Tables, table) + for { - table, err := nextRow(schema, info, isTblDroppingCol, iter) + tableMutation, err := nextRow(schema, info, isTblDroppingCol, iter) if err != nil { if errors.Cause(err) == io.EOF { break } return nil, errors.Trace(err) } - slaveBinlog.DmlData.Tables = append(slaveBinlog.DmlData.Tables, table) + table.Mutations = append(table.Mutations, tableMutation) } } return slaveBinlog, nil @@ -311,7 +315,7 @@ func createTableMutation(tp pb.MutationType, info *model.TableInfo, isTblDroppin return mut, nil } -func nextRow(schema string, info *model.TableInfo, isTblDroppingCol bool, iter *sequenceIterator) (*obinlog.Table, error) { +func nextRow(schema string, info *model.TableInfo, isTblDroppingCol bool, iter *sequenceIterator) (*obinlog.TableMutation, error) { mutType, row, err := iter.next() if err != nil { return nil, errors.Trace(err) @@ -321,7 +325,6 @@ func nextRow(schema string, info *model.TableInfo, isTblDroppingCol bool, iter * if err != nil { return nil, errors.Trace(err) } - table := genTable(schema, info) - table.Mutations = append(table.Mutations, tableMutation) - return table, nil + + return tableMutation, nil } diff --git a/drainer/translator/kafka_test.go b/drainer/translator/kafka_test.go index 4eb4d3c2c..c25496d73 100644 --- a/drainer/translator/kafka_test.go +++ b/drainer/translator/kafka_test.go @@ -64,6 +64,30 @@ func (t *testKafkaSuite) testDML(c *check.C, tp obinlog.MutationType) { } } +func (t *testKafkaSuite) TestAllDML(c *check.C) { + t.SetAllDML(c) + + slaveBinog, err := TiBinlogToSlaveBinlog(t, t.Schema, t.Table, t.TiBinlog, t.PV) + c.Assert(err, check.IsNil) + + c.Assert(slaveBinog.Type, check.Equals, obinlog.BinlogType_DML) + c.Assert(slaveBinog.GetCommitTs(), check.Equals, t.TiBinlog.GetCommitTs()) + + table := slaveBinog.DmlData.Tables[0] + + insertMut := table.Mutations[0] + updateMut := table.Mutations[1] + deleteMut := table.Mutations[2] + c.Assert(insertMut.GetType(), check.Equals, obinlog.MutationType_Insert) + c.Assert(updateMut.GetType(), check.Equals, obinlog.MutationType_Update) + c.Assert(deleteMut.GetType(), check.Equals, obinlog.MutationType_Delete) + + checkColumns(c, table.ColumnInfo, insertMut.Row.Columns, t.getDatums()) + checkColumns(c, table.ColumnInfo, deleteMut.Row.Columns, t.getDatums()) + checkColumns(c, table.ColumnInfo, updateMut.Row.Columns, t.getDatums()) + checkColumns(c, table.ColumnInfo, updateMut.ChangeRow.Columns, t.getOldDatums()) +} + func (t *testKafkaSuite) TestInsert(c *check.C) { t.SetInsert(c) diff --git a/drainer/translator/testing.go b/drainer/translator/testing.go index 6859a3cb4..b67024b26 100644 --- a/drainer/translator/testing.go +++ b/drainer/translator/testing.go @@ -148,6 +148,33 @@ func (g *BinlogGenerator) SetInsert(c *check.C) { }) } +// SetAllDML one insert/update/delete/update in one txn. +func (g *BinlogGenerator) SetAllDML(c *check.C) { + g.reset() + info := g.setEvent(c) + + mut := ti.TableMutation{ + TableId: info.ID, + } + + // insert + row := testGenInsertBinlog(c, info, g.datums) + mut.InsertedRows = append(mut.InsertedRows, row) + mut.Sequence = append(mut.Sequence, ti.MutationType_Insert) + + // update + row = testGenUpdateBinlog(c, info, g.oldDatums, g.datums) + mut.UpdatedRows = append(mut.UpdatedRows, row) + mut.Sequence = append(mut.Sequence, ti.MutationType_Update) + + // delete + row = testGenDeleteBinlog(c, info, g.datums) + mut.DeletedRows = append(mut.DeletedRows, row) + mut.Sequence = append(mut.Sequence, ti.MutationType_DeleteRow) + + g.PV.Mutations = append(g.PV.Mutations, mut) +} + // SetUpdate set up a update event binlog. func (g *BinlogGenerator) SetUpdate(c *check.C) { g.reset()