Skip to content

Commit

Permalink
release mysql dependence in syncer test (1) (pingcap#209)
Browse files Browse the repository at this point in the history
  • Loading branch information
3pointer authored and IANTHEREAL committed Aug 1, 2019
1 parent 464b81e commit 7cc671a
Show file tree
Hide file tree
Showing 5 changed files with 327 additions and 218 deletions.
2 changes: 1 addition & 1 deletion pkg/binlog/event/dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func GenDMLEvents(flavor string, serverID uint32, latestPos uint32, latestGTID g
events = append(events, tableMapEv)

// RowsEvent
rowsEv, err2 := GenRowsEvent(header, latestPos, eventType, data.TableID, defaultRowsFlag, data.Rows, data.ColumnType)
rowsEv, err2 := GenRowsEvent(header, latestPos, eventType, data.TableID, defaultRowsFlag, data.Rows, data.ColumnType, tableMapEv)
if err2 != nil {
return nil, errors.Annotatef(err2, "generate RowsEvent for `%s`.`%s`", data.Schema, data.Table)
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/binlog/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ func GenTableMapEvent(header *replication.EventHeader, latestPos uint32, tableID
// DELETE_ROWS_EVENTv0, DELETE_ROWS_EVENTv1, DELETE_ROWS_EVENTv2
// ref: https://dev.mysql.com/doc/internals/en/rows-event.html
// ref: http://blog.51cto.com/yanzongshuai/2090894
func GenRowsEvent(header *replication.EventHeader, latestPos uint32, eventType replication.EventType, tableID uint64, rowsFlags uint16, rows [][]interface{}, columnType []byte) (*replication.BinlogEvent, error) {
func GenRowsEvent(header *replication.EventHeader, latestPos uint32, eventType replication.EventType, tableID uint64, rowsFlags uint16, rows [][]interface{}, columnType []byte, tableMapEv *replication.BinlogEvent) (*replication.BinlogEvent, error) {
switch eventType {
case replication.WRITE_ROWS_EVENTv0, replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2,
replication.UPDATE_ROWS_EVENTv0, replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2,
Expand Down Expand Up @@ -675,9 +675,11 @@ func GenRowsEvent(header *replication.EventHeader, latestPos uint32, eventType r
}

// parse TableMapEvent
tableMapEv, err := GenTableMapEvent(header, latestPos, tableID, []byte("schema-placeholder"), []byte("table-placeholder"), columnType)
if err != nil {
return nil, errors.Annotate(err, "generate TableMapEvent")
if tableMapEv == nil {
tableMapEv, err = GenTableMapEvent(header, latestPos, tableID, []byte("schema-placeholder"), []byte("table-placeholder"), columnType)
if err != nil {
return nil, errors.Annotate(err, "generate TableMapEvent")
}
}
_, err = parse2.ParseSingleEvent(bytes.NewReader(tableMapEv.RawData), onEventFunc)
if err != nil {
Expand Down
20 changes: 10 additions & 10 deletions pkg/binlog/event/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,26 +490,26 @@ func (t *testEventSuite) TestGenRowsEvent(c *C) {
)

// invalid eventType, rows and columnType
rowsEv, err := GenRowsEvent(header, latestPos, eventType, tableID, rowsFlag, rows, columnType)
rowsEv, err := GenRowsEvent(header, latestPos, eventType, tableID, rowsFlag, rows, columnType, nil)
c.Assert(err, NotNil)
c.Assert(rowsEv, IsNil)

// valid eventType, invalid rows and columnType
eventType = replication.WRITE_ROWS_EVENTv0
rowsEv, err = GenRowsEvent(header, latestPos, eventType, tableID, rowsFlag, rows, columnType)
rowsEv, err = GenRowsEvent(header, latestPos, eventType, tableID, rowsFlag, rows, columnType, nil)
c.Assert(err, NotNil)
c.Assert(rowsEv, IsNil)

// valid eventType and rows, invalid columnType
row := []interface{}{int32(1)}
rows = append(rows, row)
rowsEv, err = GenRowsEvent(header, latestPos, eventType, tableID, rowsFlag, rows, columnType)
rowsEv, err = GenRowsEvent(header, latestPos, eventType, tableID, rowsFlag, rows, columnType, nil)
c.Assert(err, NotNil)
c.Assert(rowsEv, IsNil)

// all valid
columnType = []byte{gmysql.MYSQL_TYPE_LONG}
rowsEv, err = GenRowsEvent(header, latestPos, eventType, tableID, rowsFlag, rows, columnType)
rowsEv, err = GenRowsEvent(header, latestPos, eventType, tableID, rowsFlag, rows, columnType, nil)
c.Assert(err, IsNil)
c.Assert(rowsEv, NotNil)

Expand All @@ -530,7 +530,7 @@ func (t *testEventSuite) TestGenRowsEvent(c *C) {

// multi rows, with different length, invalid
rows = append(rows, []interface{}{int32(1), int32(2)})
rowsEv, err = GenRowsEvent(header, latestPos, eventType, tableID, rowsFlag, rows, columnType)
rowsEv, err = GenRowsEvent(header, latestPos, eventType, tableID, rowsFlag, rows, columnType, nil)
c.Assert(err, NotNil)
c.Assert(rowsEv, IsNil)

Expand All @@ -539,7 +539,7 @@ func (t *testEventSuite) TestGenRowsEvent(c *C) {
rows = append(rows, []interface{}{int32(1), int32(2)})
rows = append(rows, []interface{}{int32(3), int32(4)})
columnType = []byte{gmysql.MYSQL_TYPE_LONG, gmysql.MYSQL_TYPE_LONG}
rowsEv, err = GenRowsEvent(header, latestPos, eventType, tableID, rowsFlag, rows, columnType)
rowsEv, err = GenRowsEvent(header, latestPos, eventType, tableID, rowsFlag, rows, columnType, nil)
c.Assert(err, IsNil)
c.Assert(rowsEv, NotNil)
// verify the body
Expand All @@ -556,7 +556,7 @@ func (t *testEventSuite) TestGenRowsEvent(c *C) {
replication.DELETE_ROWS_EVENTv0, replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2,
}
for _, eventType = range evTypes {
rowsEv, err = GenRowsEvent(header, latestPos, eventType, tableID, rowsFlag, rows, columnType)
rowsEv, err = GenRowsEvent(header, latestPos, eventType, tableID, rowsFlag, rows, columnType, nil)
c.Assert(err, IsNil)
c.Assert(rowsEv, NotNil)
c.Assert(rowsEv.Header.EventType, Equals, eventType)
Expand All @@ -568,7 +568,7 @@ func (t *testEventSuite) TestGenRowsEvent(c *C) {
float32(1.23), float64(4.56), "string with type STRING"})
columnType = []byte{gmysql.MYSQL_TYPE_LONG, gmysql.MYSQL_TYPE_TINY, gmysql.MYSQL_TYPE_SHORT, gmysql.MYSQL_TYPE_INT24, gmysql.MYSQL_TYPE_LONGLONG,
gmysql.MYSQL_TYPE_FLOAT, gmysql.MYSQL_TYPE_DOUBLE, gmysql.MYSQL_TYPE_STRING}
rowsEv, err = GenRowsEvent(header, latestPos, eventType, tableID, rowsFlag, rows, columnType)
rowsEv, err = GenRowsEvent(header, latestPos, eventType, tableID, rowsFlag, rows, columnType, nil)
c.Assert(err, IsNil)
c.Assert(rowsEv, NotNil)
// verify the body
Expand All @@ -580,7 +580,7 @@ func (t *testEventSuite) TestGenRowsEvent(c *C) {

// column type mismatch
rows[0][0] = int8(1)
rowsEv, err = GenRowsEvent(header, latestPos, eventType, tableID, rowsFlag, rows, columnType)
rowsEv, err = GenRowsEvent(header, latestPos, eventType, tableID, rowsFlag, rows, columnType, nil)
c.Assert(err, NotNil)
c.Assert(rowsEv, IsNil)

Expand All @@ -596,7 +596,7 @@ func (t *testEventSuite) TestGenRowsEvent(c *C) {
gmysql.MYSQL_TYPE_BLOB, gmysql.MYSQL_TYPE_JSON, gmysql.MYSQL_TYPE_GEOMETRY}
for i := range unsupportedTypes {
columnType = unsupportedTypes[i : i+1]
rowsEv, err = GenRowsEvent(header, latestPos, eventType, tableID, rowsFlag, rows, columnType)
rowsEv, err = GenRowsEvent(header, latestPos, eventType, tableID, rowsFlag, rows, columnType, nil)
c.Assert(err, NotNil)
c.Assert(strings.Contains(err.Error(), "not supported"), IsTrue)
c.Assert(rowsEv, IsNil)
Expand Down
1 change: 0 additions & 1 deletion syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -997,7 +997,6 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
s.tctx.L().Info("replicate binlog from checkpoint", zap.Stringer("checkpoint", lastPos))

s.streamer, err = s.streamerProducer.generateStreamer(lastPos)

if err != nil {
return errors.Trace(err)
}
Expand Down
Loading

0 comments on commit 7cc671a

Please sign in to comment.