Skip to content

Commit

Permalink
Fix binary-column data corruption (#157)
Browse files Browse the repository at this point in the history
This commit addresses a data corruption bug in the binlog streaming
phase, where the binlog writer incorrectly propagates values in
fixed-length BINARY columns that have trailing 0s in the original value.
These trailing 0s are removed from the binlog by the SQL master and
therefore do not show up in the WHERE clause for update/delete
statements executed by the binlog writer.

NOTE: This commit requires changes to one of the vendor'ed modules in

    github.com/siddontang/go-mysql

that we patch directly in the local repo. We are working on getting
these changes into the upstream module and will need to merge these
changes once we can use the latest upstream module version.

Change-Id: Ib9c1b7308e8198f1fd38439c37f444d9a8154e6a
  • Loading branch information
Clemens Kolbitsch committed Mar 11, 2020
1 parent 76555c1 commit 1a9ea62
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 4 deletions.
34 changes: 30 additions & 4 deletions dml_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,13 @@ func appendEscapedValue(buffer []byte, value interface{}, column schema.TableCol

switch v := value.(type) {
case string:
return appendEscapedString(buffer, v)
var rightPadLengthForBinaryColumn uint
// see appendEscapedString() for details why we need special
// handling of BINARY column types
if column.Type == schema.TYPE_BINARY {
rightPadLengthForBinaryColumn = column.FixedSize
}
return appendEscapedString(buffer, v, rightPadLengthForBinaryColumn)
case []byte:
return appendEscapedBuffer(buffer, v, column.Type == schema.TYPE_JSON)
case bool:
Expand All @@ -362,7 +368,7 @@ func appendEscapedValue(buffer []byte, value interface{}, column schema.TableCol
case float32:
return strconv.AppendFloat(buffer, float64(v), 'g', -1, 64)
case decimal.Decimal:
return appendEscapedString(buffer, v.String())
return appendEscapedString(buffer, v.String(), 0)
default:
panic(fmt.Sprintf("unsupported type %t", value))
}
Expand Down Expand Up @@ -406,17 +412,37 @@ func Int64Value(value interface{}) (int64, bool) {
//
// ref: https://github.com/mysql/mysql-server/blob/mysql-5.7.5/mysys/charset.c#L963-L1038
// ref: https://github.com/go-sql-driver/mysql/blob/9181e3a86a19bacd63e68d43ae8b7b36320d8092/utils.go#L717-L758
func appendEscapedString(buffer []byte, value string) []byte {
//
// We need to support right-padding of the generated string using 0-bytes to
// mimic what a MySQL server would do for BINARY columns (with fixed length).
//
// ref: https://github.com/Shopify/ghostferry/pull/159
//
// This is specifically mentioned in the the below link:
//
// When BINARY values are stored, they are right-padded with the pad value
// to the specified length. The pad value is 0x00 (the zero byte). Values
// are right-padded with 0x00 for inserts, and no trailing bytes are removed
// for retrievals.
//
// ref: https://dev.mysql.com/doc/refman/5.7/en/binary-varbinary.html
func appendEscapedString(buffer []byte, value string, rightPadLengthForBinaryColumn uint) []byte {
buffer = append(buffer, '\'')

for i := 0; i < len(value); i++ {
var i int
for i = 0; i < len(value); i++ {
c := value[i]
if c == '\'' {
buffer = append(buffer, '\'', '\'')
} else {
buffer = append(buffer, c)
}
}
// continue 0-padding up to the desired length as provided by the
// caller
for ; i < int(rightPadLengthForBinaryColumn); i++ {
buffer = append(buffer, 0)
}

return append(buffer, '\'')
}
Expand Down
70 changes: 70 additions & 0 deletions test/integration/types_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,76 @@ def test_escaped_data
end
end

def test_copy_data_in_fixed_size_binary_column
# test for the BINARY columns needing 0-byte padding
#
# For details, see https://github.com/Shopify/ghostferry/pull/159

[source_db, target_db].each do |db|
db.query("CREATE DATABASE IF NOT EXISTS #{DEFAULT_DB}")
db.query("CREATE TABLE IF NOT EXISTS #{DEFAULT_FULL_TABLE_NAME} (id bigint(20) not null auto_increment, data BINARY(4), primary key(id))")
end

# NOTE: We explicitly test with trailing 0s, because the MySQL replication
# master will strip such trailing 0s when streaming events to us. As a
# result, the binlog writer must explicitly add then when building
# update/delete statements, as the WHERE clause would not match existing
# rows in the target DB
inserted_data = "ABC\x00"
updated_data = "EFGH"

source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (id, data) VALUES (1, _binary'#{inserted_data}')")

ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY)

row_copy_called = false
ghostferry.on_status(Ghostferry::Status::AFTER_ROW_COPY) do
row_copy_called = true
# select row from the target and then make sure the data with 0 padding
# is present. We do this to make sure there are no races in the test
res = target_db.query("SELECT * FROM #{DEFAULT_FULL_TABLE_NAME}")
assert_equal 1, res.count
res.each do |row|
assert_equal 1, row["id"]
assert_equal inserted_data, row["data"]
end

# now that the target is guaranteed to be in the same state as the
# source, trigger an update that will cause the binlog to stream an
# entry that needs the 0-byte padding
#
# NOTE: We could also do this via the hook
#
# Ghostferry::Status::BINLOG_STREAMING_STARTED
#
# but that puts us at risk of races within the test framework.
# See https://github.com/Shopify/ghostferry/issues/107
source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = _binary'#{updated_data}' WHERE id = 1")
end

binlog_apply_called = false
ghostferry.on_status(Ghostferry::Status::AFTER_BINLOG_APPLY) do
binlog_apply_called = true
# just being paranoid here: make sure the test outcome is as expected.
# It should be, since we made sure the tables have the same checksums,
# but it helps understand what the test code does
res = target_db.query("SELECT * FROM #{DEFAULT_FULL_TABLE_NAME}")
assert_equal 1, res.count
res.each do |row|
assert_equal 1, row["id"]
assert_equal updated_data, row["data"]
end
end

ghostferry.run

# make sure the test framework called the expected hooks above - otherwise
# the test doesn't make much sense
assert row_copy_called
assert binlog_apply_called
assert_test_table_is_identical
end

private

def insert_json_on_source
Expand Down
28 changes: 28 additions & 0 deletions vendor/github.com/siddontang/go-mysql/schema/schema.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 1a9ea62

Please sign in to comment.