Skip to content

Commit

Permalink
Merge pull request #2587 from HubSpot/ignore_rbr_errors
Browse files Browse the repository at this point in the history
just skip any unparseable rbr events
  • Loading branch information
alainjobart authored Feb 23, 2017
2 parents dd4fe43 + 61f482f commit d315c82
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 25 deletions.
36 changes: 20 additions & 16 deletions go/mysqlconn/replication/binlog_event_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,28 +397,28 @@ func cellLength(data []byte, pos int, tmc *TableMapColumn) (int, error) {

// FIXME(alainjobart) are the ints signed? It seems Tiny is unsigned,
// but the others are.
func cellData(data []byte, pos int, tmc *TableMapColumn) (string, int) {
func cellData(data []byte, pos int, tmc *TableMapColumn) (string, int, error) {
switch tmc.Type {
case TypeTiny:
return fmt.Sprintf("%v", data[pos]), 1
return fmt.Sprintf("%v", data[pos]), 1, nil
case TypeShort, TypeYear:
val := binary.LittleEndian.Uint16(data[pos : pos+2])
return fmt.Sprintf("%v", val), 2
return fmt.Sprintf("%v", val), 2, nil
case TypeLong, TypeInt24:
val := binary.LittleEndian.Uint32(data[pos : pos+4])
return fmt.Sprintf("%v", val), 4
return fmt.Sprintf("%v", val), 4, nil
case TypeLongLong:
val := binary.LittleEndian.Uint64(data[pos : pos+8])
return fmt.Sprintf("%v", val), 8
return fmt.Sprintf("%v", val), 8, nil
case TypeTimestamp, TypeDate, TypeTime, TypeDateTime:
panic(fmt.Errorf("NYI"))
panic(fmt.Errorf("Not yet implemented type %v", tmc.Type))
case TypeVarchar:
// Varchar length is two bytes here.
l := int(uint64(data[pos]) |
uint64(data[pos+1])<<8)
return string(data[pos+2 : pos+2+l]), 2 + l
return string(data[pos+2 : pos+2+l]), 2 + l, nil
default:
panic(fmt.Errorf("Unsupported type %v", tmc.Type))
return "", 0, fmt.Errorf("Unsupported type %v", tmc.Type)
}
}

Expand Down Expand Up @@ -554,8 +554,7 @@ func (ev binlogEvent) Rows(f BinlogFormat, tm *TableMap) (Rows, error) {
}

// StringValues is a helper method to return the string value of all columns in a row in a Row.
// Will panic if anything goes wrong, this is meant for tests for now.
func (rs *Rows) StringValues(tm *TableMap, rowIndex int) []string {
func (rs *Rows) StringValues(tm *TableMap, rowIndex int) ([]string, error) {
var result []string

valueIndex := 0
Expand All @@ -574,18 +573,20 @@ func (rs *Rows) StringValues(tm *TableMap, rowIndex int) []string {
}

// We have real data
value, l := cellData(data, pos, &tm.Columns[c])
value, l, err := cellData(data, pos, &tm.Columns[c])
if err != nil {
return nil, err
}
result = append(result, value)
pos += l
valueIndex++
}

return result
return result, nil
}

// StringIdentifies is a helper method to return the string identify of all columns in a row in a Row.
// Will panic if anything goes wrong, this is meant for tests for now.
func (rs *Rows) StringIdentifies(tm *TableMap, rowIndex int) []string {
func (rs *Rows) StringIdentifies(tm *TableMap, rowIndex int) ([]string, error) {
var result []string

valueIndex := 0
Expand All @@ -604,11 +605,14 @@ func (rs *Rows) StringIdentifies(tm *TableMap, rowIndex int) []string {
}

// We have real data
value, l := cellData(data, pos, &tm.Columns[c])
value, l, err := cellData(data, pos, &tm.Columns[c])
if err != nil {
return nil, err
}
result = append(result, value)
pos += l
valueIndex++
}

return result
return result, nil
}
6 changes: 3 additions & 3 deletions go/mysqlconn/replication/binlog_event_make_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,11 +317,11 @@ func TestRowsEvent(t *testing.T) {

// Test the Rows we just created, to be sure.
// 1076895760 is 0x40302010.
identifies := rows.StringIdentifies(tm, 0)
identifies, err := rows.StringIdentifies(tm, 0)
if expected := []string{"1076895760", "abc"}; !reflect.DeepEqual(identifies, expected) {
t.Fatalf("bad Rows idenfity, got %v expected %v", identifies, expected)
}
values := rows.StringValues(tm, 0)
values, err := rows.StringValues(tm, 0)
if expected := []string{"1076895760", "abcd"}; !reflect.DeepEqual(values, expected) {
t.Fatalf("bad Rows data, got %v expected %v", values, expected)
}
Expand All @@ -334,7 +334,7 @@ func TestRowsEvent(t *testing.T) {
t.Fatalf("NewRowsEvent().IsUpdateRows() if false")
}

event, _, err := event.StripChecksum(f)
event, _, err = event.StripChecksum(f)
if err != nil {
t.Fatalf("StripChecksum failed: %v", err)
}
Expand Down
8 changes: 4 additions & 4 deletions go/mysqlconn/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ func testRowReplicationWithRealDatabase(t *testing.T, params *sqldb.ConnParams)
}

// Check it has 2 rows, and first value is '10', second value is 'nice name'.
values := wr.StringValues(tableMap, 0)
values, _ := wr.StringValues(tableMap, 0)
t.Logf("Got WriteRows event data: %v %v", wr, values)
if expected := []string{"10", "nice name"}; !reflect.DeepEqual(values, expected) {
t.Fatalf("StringValues returned %v, expected %v", values, expected)
Expand All @@ -560,14 +560,14 @@ func testRowReplicationWithRealDatabase(t *testing.T, params *sqldb.ConnParams)
}

// Check it has 2 identify rows, and first value is '10', second value is 'nice name'.
values := ur.StringIdentifies(tableMap, 0)
values, _ := ur.StringIdentifies(tableMap, 0)
t.Logf("Got UpdateRows event identify: %v %v", ur, values)
if expected := []string{"10", "nice name"}; !reflect.DeepEqual(values, expected) {
t.Fatalf("StringIdentifies returned %v, expected %v", values, expected)
}

// Check it has 2 values rows, and first value is '10', second value is 'nicer name'.
values = ur.StringValues(tableMap, 0)
values, _ = ur.StringValues(tableMap, 0)
t.Logf("Got UpdateRows event data: %v %v", ur, values)
if expected := []string{"10", "nicer name"}; !reflect.DeepEqual(values, expected) {
t.Fatalf("StringValues returned %v, expected %v", values, expected)
Expand All @@ -584,7 +584,7 @@ func testRowReplicationWithRealDatabase(t *testing.T, params *sqldb.ConnParams)
}

// Check it has 2 rows, and first value is '10', second value is 'nicer name'.
values := dr.StringIdentifies(tableMap, 0)
values, _ := dr.StringIdentifies(tableMap, 0)
t.Logf("Got DeleteRows event identify: %v %v", dr, values)
if expected := []string{"10", "nicer name"}; !reflect.DeepEqual(values, expected) {
t.Fatalf("StringIdentifies returned %v, expected %v", values, expected)
Expand Down
12 changes: 10 additions & 2 deletions go/vt/binlog/binlog_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,8 +388,16 @@ func (bls *Streamer) parseEvents(ctx context.Context, events <-chan replication.
}
statements = append(statements, setTimestamp)
for i := range rows.Rows {
identifies := rows.StringIdentifies(tm, i)
values := rows.StringValues(tm, i)
identifies, err := rows.StringIdentifies(tm, i)
if err != nil {
log.Warningf("Failed to parse UPDATE due to error %v", err)
continue
}
values, err := rows.StringValues(tm, i)
if err != nil {
log.Warningf("Failed to parse UPDATE due to error %v", err)
continue
}
update := &binlogdatapb.BinlogTransaction_Statement{
Category: binlogdatapb.BinlogTransaction_Statement_BL_UPDATE,
Sql: []byte(fmt.Sprintf("WIP: update table %v set values = %v where identifies = %v", tm.Name, values, identifies)),
Expand Down

0 comments on commit d315c82

Please sign in to comment.