Skip to content

Commit

Permalink
Fix multi rows in one txn (pingcap#958)
Browse files Browse the repository at this point in the history
fix issue pingcap#932
this

Co-authored-by: WangXiangUSTC <wx347249478@gmail.com>
  • Loading branch information
july2993 and WangXiangUSTC authored May 29, 2020
1 parent 08fe95b commit 05a5c51
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 6 deletions.
15 changes: 9 additions & 6 deletions drainer/translator/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func TiBinlogToSlaveBinlog(
}
return slaveBinlog, nil
}

slaveBinlog := &obinlog.Binlog{
Type: obinlog.BinlogType_DML,
CommitTs: tiBinlog.GetCommitTs(),
Expand All @@ -71,15 +72,18 @@ func TiBinlogToSlaveBinlog(
}

iter := newSequenceIterator(&mut)
table := genTable(schema, info)
slaveBinlog.DmlData.Tables = append(slaveBinlog.DmlData.Tables, table)

for {
table, err := nextRow(schema, info, isTblDroppingCol, iter)
tableMutation, err := nextRow(schema, info, isTblDroppingCol, iter)
if err != nil {
if errors.Cause(err) == io.EOF {
break
}
return nil, errors.Trace(err)
}
slaveBinlog.DmlData.Tables = append(slaveBinlog.DmlData.Tables, table)
table.Mutations = append(table.Mutations, tableMutation)
}
}
return slaveBinlog, nil
Expand Down Expand Up @@ -311,7 +315,7 @@ func createTableMutation(tp pb.MutationType, info *model.TableInfo, isTblDroppin
return mut, nil
}

func nextRow(schema string, info *model.TableInfo, isTblDroppingCol bool, iter *sequenceIterator) (*obinlog.Table, error) {
func nextRow(schema string, info *model.TableInfo, isTblDroppingCol bool, iter *sequenceIterator) (*obinlog.TableMutation, error) {
mutType, row, err := iter.next()
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -321,7 +325,6 @@ func nextRow(schema string, info *model.TableInfo, isTblDroppingCol bool, iter *
if err != nil {
return nil, errors.Trace(err)
}
table := genTable(schema, info)
table.Mutations = append(table.Mutations, tableMutation)
return table, nil

return tableMutation, nil
}
24 changes: 24 additions & 0 deletions drainer/translator/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,30 @@ func (t *testKafkaSuite) testDML(c *check.C, tp obinlog.MutationType) {
}
}

func (t *testKafkaSuite) TestAllDML(c *check.C) {
t.SetAllDML(c)

slaveBinog, err := TiBinlogToSlaveBinlog(t, t.Schema, t.Table, t.TiBinlog, t.PV)
c.Assert(err, check.IsNil)

c.Assert(slaveBinog.Type, check.Equals, obinlog.BinlogType_DML)
c.Assert(slaveBinog.GetCommitTs(), check.Equals, t.TiBinlog.GetCommitTs())

table := slaveBinog.DmlData.Tables[0]

insertMut := table.Mutations[0]
updateMut := table.Mutations[1]
deleteMut := table.Mutations[2]
c.Assert(insertMut.GetType(), check.Equals, obinlog.MutationType_Insert)
c.Assert(updateMut.GetType(), check.Equals, obinlog.MutationType_Update)
c.Assert(deleteMut.GetType(), check.Equals, obinlog.MutationType_Delete)

checkColumns(c, table.ColumnInfo, insertMut.Row.Columns, t.getDatums())
checkColumns(c, table.ColumnInfo, deleteMut.Row.Columns, t.getDatums())
checkColumns(c, table.ColumnInfo, updateMut.Row.Columns, t.getDatums())
checkColumns(c, table.ColumnInfo, updateMut.ChangeRow.Columns, t.getOldDatums())
}

func (t *testKafkaSuite) TestInsert(c *check.C) {
t.SetInsert(c)

Expand Down
27 changes: 27 additions & 0 deletions drainer/translator/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,33 @@ func (g *BinlogGenerator) SetInsert(c *check.C) {
})
}

// SetAllDML one insert/update/delete/update in one txn.
func (g *BinlogGenerator) SetAllDML(c *check.C) {
g.reset()
info := g.setEvent(c)

mut := ti.TableMutation{
TableId: info.ID,
}

// insert
row := testGenInsertBinlog(c, info, g.datums)
mut.InsertedRows = append(mut.InsertedRows, row)
mut.Sequence = append(mut.Sequence, ti.MutationType_Insert)

// update
row = testGenUpdateBinlog(c, info, g.oldDatums, g.datums)
mut.UpdatedRows = append(mut.UpdatedRows, row)
mut.Sequence = append(mut.Sequence, ti.MutationType_Update)

// delete
row = testGenDeleteBinlog(c, info, g.datums)
mut.DeletedRows = append(mut.DeletedRows, row)
mut.Sequence = append(mut.Sequence, ti.MutationType_DeleteRow)

g.PV.Mutations = append(g.PV.Mutations, mut)
}

// SetUpdate set up a update event binlog.
func (g *BinlogGenerator) SetUpdate(c *check.C) {
g.reset()
Expand Down

0 comments on commit 05a5c51

Please sign in to comment.