Skip to content

Commit

Permalink
Set MySQL sink default isolation level to read-committed (#4138)
Browse files Browse the repository at this point in the history
close #3589
  • Loading branch information
maxshuang committed Jan 16, 2022
1 parent 402f062 commit 36e4685
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 3 deletions.
1 change: 1 addition & 0 deletions cdc/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,7 @@ func (s *mysqlSink) prepareDMLs(rows []*model.RowChangedEvent, replicaID uint64,
values := make([][]interface{}, 0, len(rows))
replaces := make(map[string][][]interface{})
rowCount := 0
// translateToInsert control the update and insert behavior
translateToInsert := s.params.enableOldValue && !s.params.safeMode

// flush cached batch replace or insert, to keep the sequence of DMLs
Expand Down
3 changes: 3 additions & 0 deletions cdc/sink/mysql_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const (
defaultWriteTimeout = "2m"
defaultDialTimeout = "2m"
defaultSafeMode = true
defaultTxnIsolationRC = "READ-COMMITTED"
)

var defaultParams = &sinkParams{
Expand Down Expand Up @@ -227,6 +228,8 @@ func generateDSNByParams(
dsnCfg.Params["readTimeout"] = params.readTimeout
dsnCfg.Params["writeTimeout"] = params.writeTimeout
dsnCfg.Params["timeout"] = params.dialTimeout
// Since we don't need select, just set default isolation level to read-committed
dsnCfg.Params["transaction_isolation"] = fmt.Sprintf(`"%s"`, defaultTxnIsolationRC)

autoRandom, err := checkTiDBVariable(ctx, testDB, "allow_auto_random_explicit_insert", "1")
if err != nil {
Expand Down
4 changes: 1 addition & 3 deletions cdc/sink/mysql_params_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
)

func TestSinkParamsClone(t *testing.T) {
defer testleak.AfterTestT(t)()
param1 := defaultParams.Clone()
param2 := param1.Clone()
param2.changefeedID = "123"
Expand Down Expand Up @@ -59,8 +58,6 @@ func TestSinkParamsClone(t *testing.T) {
}

func TestGenerateDSNByParams(t *testing.T) {
defer testleak.AfterTestT(t)()

testDefaultParams := func() {
db, err := mockTestDB(false)
require.Nil(t, err)
Expand All @@ -76,6 +73,7 @@ func TestGenerateDSNByParams(t *testing.T) {
"readTimeout=2m",
"writeTimeout=2m",
"allow_auto_random_explicit_insert=1",
"transaction_isolation=%22READ-COMMITTED%22",
}
for _, param := range expectedParams {
require.True(t, strings.Contains(dsnStr, param))
Expand Down

0 comments on commit 36e4685

Please sign in to comment.