diff --git a/test/go/dml_events_test.go b/test/go/dml_events_test.go index 110470b2..ef8928e0 100644 --- a/test/go/dml_events_test.go +++ b/test/go/dml_events_test.go @@ -29,7 +29,7 @@ func (this *DMLEventsTestSuite) SetupTest() { columns := []schema.TableColumn{ {Name: "col1"}, - {Name: "col2"}, + {Name: "col2", Type: schema.TYPE_JSON}, {Name: "col3"}, } @@ -62,12 +62,13 @@ func (this *DMLEventsTestSuite) TestBinlogInsertEventGeneratesInsertQuery() { Rows: [][]interface{}{ {1000, []byte("val1"), true}, {1001, []byte("val2"), false}, + {1002, "{\"val\": 42.0}", false}, }, } dmlEvents, err := ghostferry.NewBinlogInsertEvents(this.eventBase, rowsEvent) this.Require().Nil(err) - this.Require().Equal(2, len(dmlEvents)) + this.Require().Equal(3, len(dmlEvents)) q1, err := dmlEvents[0].AsSQLString(this.targetTable.Schema, this.targetTable.Name) this.Require().Nil(err) @@ -76,6 +77,10 @@ func (this *DMLEventsTestSuite) TestBinlogInsertEventGeneratesInsertQuery() { q2, err := dmlEvents[1].AsSQLString(this.targetTable.Schema, this.targetTable.Name) this.Require().Nil(err) this.Require().Equal("INSERT IGNORE INTO `target_schema`.`target_table` (`col1`,`col2`,`col3`) VALUES (1001,_binary'val2',0)", q2) + + q3, err := dmlEvents[2].AsSQLString(this.targetTable.Schema, this.targetTable.Name) + this.Require().Nil(err) + this.Require().Equal("INSERT IGNORE INTO `target_schema`.`target_table` (`col1`,`col2`,`col3`) VALUES (1002,CAST('{\"val\": 42.0}' AS JSON),0)", q3) } func (this *DMLEventsTestSuite) TestBinlogInsertEventWithWrongColumnsReturnsError() { @@ -117,12 +122,14 @@ func (this *DMLEventsTestSuite) TestBinlogUpdateEventGeneratesUpdateQuery() { {1000, []byte("val2"), false}, {1001, []byte("val3"), false}, {1001, []byte("val4"), true}, + {1002, "{\"val\": 42.0}", false}, + {1002, "{\"val\": 43.0}", false}, }, } dmlEvents, err := ghostferry.NewBinlogUpdateEvents(this.eventBase, rowsEvent) this.Require().Nil(err) - this.Require().Equal(2, len(dmlEvents)) + this.Require().Equal(3, len(dmlEvents)) q1, err := dmlEvents[0].AsSQLString(this.targetTable.Schema, this.targetTable.Name) this.Require().Nil(err) @@ -131,6 +138,10 @@ func (this *DMLEventsTestSuite) TestBinlogUpdateEventGeneratesUpdateQuery() { q2, err := dmlEvents[1].AsSQLString(this.targetTable.Schema, this.targetTable.Name) this.Require().Nil(err) this.Require().Equal("UPDATE `target_schema`.`target_table` SET `col1`=1001,`col2`=_binary'val4',`col3`=1 WHERE `col1`=1001 AND `col2`=_binary'val3' AND `col3`=0", q2) + + q3, err := dmlEvents[2].AsSQLString(this.targetTable.Schema, this.targetTable.Name) + this.Require().Nil(err) + this.Require().Equal("UPDATE `target_schema`.`target_table` SET `col1`=1002,`col2`=CAST('{\"val\": 43.0}' AS JSON),`col3`=0 WHERE `col1`=1002 AND `col2`=CAST('{\"val\": 42.0}' AS JSON) AND `col3`=0", q3) } func (this *DMLEventsTestSuite) TestBinlogUpdateEventWithWrongColumnsReturnsError() { diff --git a/test/integration/types_test.rb b/test/integration/types_test.rb index 0b908e8d..86f8cfa3 100644 --- a/test/integration/types_test.rb +++ b/test/integration/types_test.rb @@ -1,13 +1,16 @@ require "test_helper" class TypesTest < GhostferryTestCase - JSON_OBJ = '{"data": {"quote": "\\\'", "value": [1]}}' + JSON_OBJ = '{"data": {"quote": "\\\'", "value": [1, 12.13]}}' + JSON_OBJ_WITH_TRAILING_ZERO = '{"data": {"float": 32.0}}' EMPTY_JSON = '{}' JSON_ARRAY = '[\"test_data\", \"test_data_2\"]' JSON_NULL = 'null' JSON_TRUE = 'true' JSON_FALSE = 'false' JSON_NUMBER = '42' + JSON_FLOATING_POINT_WITH_ZERO_FRACTIONAL_PART = '52.0' + JSON_FLOATING_POINT_WITH_NON_ZERO_FRACTIONAL_PART = '52.13' def test_json_colum_not_null_with_no_default_is_invalid_this_is_fine # See: https://bugs.mysql.com/bug.php?id=98496 @@ -103,10 +106,10 @@ def test_json_data_insert # with a JSON column is broken on 5.7. # See: https://bugs.mysql.com/bug.php?id=87847 res = target_db.query("SELECT COUNT(*) AS cnt FROM #{DEFAULT_FULL_TABLE_NAME}") - assert_equal 16, res.first["cnt"] + assert_equal 22, res.first["cnt"] expected = [ - {"id"=>1, "data"=>"{\"data\": {\"quote\": \"'\", \"value\": [1]}}"}, + {"id"=>1, "data"=>"{\"data\": {\"quote\": \"'\", \"value\": [1, 12.13]}}"}, {"id"=>2, "data"=>"[\"test_data\", \"test_data_2\"]"}, {"id"=>3, "data"=>"{}"}, {"id"=>4, "data"=>nil}, @@ -114,17 +117,19 @@ def test_json_data_insert {"id"=>6, "data"=>"true"}, {"id"=>7, "data"=>"false"}, {"id"=>8, "data"=>"42"}, - - {"id"=>9, "data"=>"{\"data\": {\"quote\": \"'\", \"value\": [1]}}"}, - {"id"=>10, "data"=>"[\"test_data\", \"test_data_2\"]"}, - {"id"=>11, "data"=>"{}"}, - {"id"=>12, "data"=>nil}, - {"id"=>13, "data"=>"null"}, - {"id"=>14, "data"=>"true"}, - {"id"=>15, "data"=>"false"}, - {"id"=>16, "data"=>"42"}, + {"id"=>9, "data"=>"52.13"}, + {"id" => 10, "data" => format_float_based_on_mysql_version("52.0")}, + {"id" => 11, "data" => "{\"data\": {\"float\": #{format_float_based_on_mysql_version("32.0")}}}"} ] + expected_length = expected.length + expected_for_second_insert = Marshal.load(Marshal.dump(expected)) # makes deep copy of the original array + + expected += expected_for_second_insert.map do |row| + row["id"] += expected_length + row + end + res = target_db.query("SELECT * FROM #{DEFAULT_FULL_TABLE_NAME} ORDER BY id ASC") res.zip(expected).each do |row, expected_row| assert_equal expected_row, row @@ -152,8 +157,8 @@ def test_json_data_delete loop do sleep 0.1 res = target_db.query("SELECT COUNT(*) AS cnt FROM #{DEFAULT_FULL_TABLE_NAME}") - if res.first["cnt"] == 8 - 1.upto(8) do |i| + if res.first["cnt"] == 11 + 1.upto(11) do |i| source_db.query("DELETE FROM #{DEFAULT_FULL_TABLE_NAME} WHERE id = #{i}") end break @@ -194,15 +199,18 @@ def test_json_data_update loop do sleep 0.1 res = target_db.query("SELECT COUNT(*) AS cnt FROM #{DEFAULT_FULL_TABLE_NAME}") - if res.first["cnt"] == 8 + if res.first["cnt"] == 11 source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = '#{EMPTY_JSON}' WHERE id = 1") source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = '#{JSON_ARRAY}' WHERE id = 2") source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = NULL WHERE id = 3") - source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = '#{JSON_OBJ}' WHERE id = 4") + source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = '#{JSON_OBJ_WITH_TRAILING_ZERO}' WHERE id = 4") source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = '#{JSON_TRUE}' WHERE id = 5") source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = '#{JSON_FALSE}' WHERE id = 6") - source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = '#{JSON_NUMBER}' WHERE id = 7") - source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = '#{JSON_NULL}' WHERE id = 8") + source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = '#{JSON_NULL}' WHERE id = 7") + source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = '#{JSON_FLOATING_POINT_WITH_ZERO_FRACTIONAL_PART}' WHERE id = 8") + source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = '#{JSON_FLOATING_POINT_WITH_NON_ZERO_FRACTIONAL_PART}' WHERE id = 9") + source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = '#{JSON_NUMBER}' WHERE id = 10") + source_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = '#{JSON_OBJ}' WHERE id = 11") break end @@ -217,17 +225,20 @@ def test_json_data_update refute timedout, "failed due to time out while waiting for the 4 insert binlogs to be written to the target" res = target_db.query("SELECT COUNT(*) AS cnt FROM #{DEFAULT_FULL_TABLE_NAME}") - assert_equal 8, res.first["cnt"] + assert_equal 11, res.first["cnt"] expected = [ {"id"=>1, "data"=>"{}"}, {"id"=>2, "data"=>"[\"test_data\", \"test_data_2\"]"}, {"id"=>3, "data"=>nil}, - {"id"=>4, "data"=>"{\"data\": {\"quote\": \"'\", \"value\": [1]}}"}, + {"id"=>4, "data"=>"{\"data\": {\"float\": #{format_float_based_on_mysql_version("32.0")}}}"}, {"id"=>5, "data"=>"true"}, {"id"=>6, "data"=>"false"}, - {"id"=>7, "data"=>"42"}, - {"id"=>8, "data"=>"null"}, + {"id"=>7, "data"=>"null"}, + {"id"=>8, "data"=>format_float_based_on_mysql_version("52.0")}, + {"id"=>9, "data"=>"52.13"}, + {"id" => 10, "data" => "42"}, + {"id" => 11, "data" => "{\"data\": {\"quote\": \"'\", \"value\": [1, 12.13]}}"}, ] res = target_db.query("SELECT * FROM #{DEFAULT_FULL_TABLE_NAME} ORDER BY id ASC") @@ -319,7 +330,7 @@ def test_copy_data_in_fixed_size_binary_column def test_copy_data_in_fixed_size_binary_column__value_completely_filled # Also see: https://github.com/Shopify/ghostferry/pull/159#issuecomment-597769258 - # + # # NOTE: This test is interesting (beyond what is covered above already), # because it seems the server strips the trailing 0-bytes before sending # them to the binlog even when the trailing 0-bytes are inserted by the user. @@ -334,7 +345,7 @@ def test_copy_data_in_fixed_size_binary_column__value_completely_filled def test_copy_data_in_fixed_size_binary_column__value_is_empty_and_length_is_1 # Also see: https://github.com/Shopify/ghostferry/pull/159#issuecomment-597769258 - # + # # slight variation to cover the corner-case where there is no data in the # column at all and the entire value is 0-padded (here, only 1 byte) execute_copy_data_in_fixed_size_binary_column( @@ -393,10 +404,13 @@ def test_decimal end end - - private + def format_float_based_on_mysql_version(value) + # mysql 5.7 removes the trailing zeros when `cast...as json` is used + ENV["MYSQL_VERSION"] == "8.0" ? value.to_s : value.to_i.to_s + end + def insert_json_on_source source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (data) VALUES ('#{JSON_OBJ}')") source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (data) VALUES ('#{JSON_ARRAY}')") @@ -406,6 +420,9 @@ def insert_json_on_source source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (data) VALUES ('#{JSON_TRUE}')") source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (data) VALUES ('#{JSON_FALSE}')") source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (data) VALUES ('#{JSON_NUMBER}')") + source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (data) VALUES ('#{JSON_FLOATING_POINT_WITH_NON_ZERO_FRACTIONAL_PART}')") + source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (data) VALUES ('#{JSON_FLOATING_POINT_WITH_ZERO_FRACTIONAL_PART}')") + source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (data) VALUES ('#{JSON_OBJ_WITH_TRAILING_ZERO}')") end def execute_copy_data_in_fixed_size_binary_column(column_size:, inserted_data:, expected_inserted_data:, updated_data:) diff --git a/vendor/github.com/go-mysql-org/go-mysql/replication/json_binary.go b/vendor/github.com/go-mysql-org/go-mysql/replication/json_binary.go index ab6c0753..e8d87256 100644 --- a/vendor/github.com/go-mysql-org/go-mysql/replication/json_binary.go +++ b/vendor/github.com/go-mysql-org/go-mysql/replication/json_binary.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "math" + "strconv" . "github.com/go-mysql-org/go-mysql/mysql" "github.com/pingcap/errors" @@ -44,6 +45,16 @@ const ( jsonbValueEntrySizeLarge = 1 + jsonbLargeOffsetSize ) +type FloatWithTrailingZero float64 + +func (f FloatWithTrailingZero) MarshalJSON() ([]byte, error) { + if float64(f) == float64(int(f)) { + return []byte(strconv.FormatFloat(float64(f), 'f', 1, 32)), nil + } + + return []byte(strconv.FormatFloat(float64(f), 'f', -1, 32)), nil +} + func jsonbGetOffsetSize(isSmall bool) int { if isSmall { return jsonbSmallOffsetSize @@ -128,7 +139,7 @@ func (d *jsonBinaryDecoder) decodeValue(tp byte, data []byte) interface{} { case JSONB_UINT64: return d.decodeUint64(data) case JSONB_DOUBLE: - return d.decodeDouble(data) + return d.decodeDoubleWithTrailingZero(data) case JSONB_STRING: return d.decodeString(data) case JSONB_OPAQUE: @@ -348,6 +359,11 @@ func (d *jsonBinaryDecoder) decodeDouble(data []byte) float64 { return v } +func (d *jsonBinaryDecoder) decodeDoubleWithTrailingZero(data []byte) FloatWithTrailingZero { + v := d.decodeDouble(data) + return FloatWithTrailingZero(v) +} + func (d *jsonBinaryDecoder) decodeString(data []byte) string { if d.err != nil { return ""