diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index 5e542d0983c..10008810de7 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -69,8 +69,6 @@ const ( // SyncpointTableName is the name of table where all syncpoint maps sit const syncpointTableName string = "syncpoint_v1" -const tidbVersionString string = "TiDB" - var validSchemes = map[string]bool{ "mysql": true, "mysql+ssl": true, @@ -308,37 +306,21 @@ var defaultParams = &sinkParams{ safeMode: defaultSafeMode, } -func checkIsTiDB(ctx context.Context, db *sql.DB) (bool, error) { - var value string - querySQL := "select version();" - err := db.QueryRowContext(ctx, querySQL).Scan(&value) - if err != nil && err != sql.ErrNoRows { - return false, errors.Annotate(cerror.WrapError(cerror.ErrMySQLQueryError, err), "failed to select version") - } - return strings.Contains(value, tidbVersionString), nil -} - -func checkTiDBVariable(ctx context.Context, db *sql.DB, variableName, defaultValue string) ( - sinkURIParameter string, - err error, -) { +func checkTiDBVariable(ctx context.Context, db *sql.DB, variableName, defaultValue string) (string, error) { var name string var value string querySQL := fmt.Sprintf("show session variables like '%s';", variableName) - err = db.QueryRowContext(ctx, querySQL).Scan(&name, &value) + err := db.QueryRowContext(ctx, querySQL).Scan(&name, &value) if err != nil && err != sql.ErrNoRows { errMsg := "fail to query session variable " + variableName - err = errors.Annotate(cerror.WrapError(cerror.ErrMySQLQueryError, err), errMsg) - return + return "", errors.Annotate(cerror.WrapError(cerror.ErrMySQLQueryError, err), errMsg) } + // session variable works, use given default value if err == nil { - // session variable exists, use given default value - sinkURIParameter = defaultValue - } else { - // session variable does not exist, sinkURIParameter is "" and will be ignored - err = nil + return defaultValue, nil } - return + // session variable not exists, return "" to ignore it + return "", nil } func configureSinkURI( @@ -377,21 +359,6 @@ func configureSinkURI( dsnCfg.Params["tidb_txn_mode"] = txnMode } - isTiDB, err := checkIsTiDB(ctx, testDB) - if err != nil { - return "", err - } - // variable `explicit_defaults_for_timestamp` is readonly in TiDB, we don't - // need to set it. Yet Default value in MySQL 5.7 is `OFF` - // ref: https://docs.pingcap.com/tidb/stable/mysql-compatibility#default-differences - if !isTiDB { - explicitTs, err := checkTiDBVariable(ctx, testDB, "explicit_defaults_for_timestamp", "ON") - if err != nil { - return "", err - } - dsnCfg.Params["explicit_defaults_for_timestamp"] = explicitTs - } - dsnClone := dsnCfg.Clone() dsnClone.Passwd = "******" log.Info("sink uri is configured", zap.String("format dsn", dsnClone.FormatDSN())) diff --git a/cdc/sink/mysql_test.go b/cdc/sink/mysql_test.go index a1dbf65c982..92e63677643 100644 --- a/cdc/sink/mysql_test.go +++ b/cdc/sink/mysql_test.go @@ -704,29 +704,6 @@ func (s MySQLSinkSuite) TestConfigureSinkURI(c *check.C) { c.Assert(err, check.IsNil) defer db.Close() - dsn, err := dmysql.ParseDSN("root:123456@tcp(127.0.0.1:4000)/") - c.Assert(err, check.IsNil) - params := defaultParams.Clone() - dsnStr, err := configureSinkURI(context.TODO(), dsn, params, db) - c.Assert(err, check.IsNil) - expectedParams := []string{ - "tidb_txn_mode=optimistic", - "readTimeout=2m", - "writeTimeout=2m", - "allow_auto_random_explicit_insert=1", - "explicit_defaults_for_timestamp=ON", - } - for _, param := range expectedParams { - c.Assert(strings.Contains(dsnStr, param), check.IsTrue) - } - c.Assert(strings.Contains(dsnStr, "time_zone"), check.IsFalse) - } - - testDefaultParamsTiDB := func() { - db, err := mockTestDBTiDB() - c.Assert(err, check.IsNil) - defer db.Close() - dsn, err := dmysql.ParseDSN("root:123456@tcp(127.0.0.1:4000)/") c.Assert(err, check.IsNil) params := defaultParams.Clone() @@ -782,7 +759,6 @@ func (s MySQLSinkSuite) TestConfigureSinkURI(c *check.C) { } testDefaultParams() - testDefaultParamsTiDB() testTimezoneParam() testTimeoutParams() } @@ -901,34 +877,6 @@ func mockTestDB() (*sql.DB, error) { mock.ExpectQuery("show session variables like 'tidb_txn_mode';").WillReturnRows( sqlmock.NewRows(columns).AddRow("tidb_txn_mode", "pessimistic"), ) - // Simulate the default value in MySQL5.7 is OFF - mock.ExpectQuery("select version\\(\\);").WillReturnRows( - sqlmock.NewRows([]string{"version"}).AddRow("5.7.32"), - ) - // Simulate the default value in MySQL5.7 is OFF - mock.ExpectQuery("show session variables like 'explicit_defaults_for_timestamp';").WillReturnRows( - sqlmock.NewRows(columns).AddRow("explicit_defaults_for_timestamp", "OFF"), - ) - mock.ExpectClose() - return db, nil -} - -func mockTestDBTiDB() (*sql.DB, error) { - // mock for test db, which is used querying TiDB session variable - db, mock, err := sqlmock.New() - if err != nil { - return nil, err - } - columns := []string{"Variable_name", "Value"} - mock.ExpectQuery("show session variables like 'allow_auto_random_explicit_insert';").WillReturnRows( - sqlmock.NewRows(columns).AddRow("allow_auto_random_explicit_insert", "1"), - ) - mock.ExpectQuery("show session variables like 'tidb_txn_mode';").WillReturnRows( - sqlmock.NewRows(columns).AddRow("tidb_txn_mode", "optimistic"), - ) - mock.ExpectQuery("select version\\(\\);").WillReturnRows( - sqlmock.NewRows([]string{"version"}).AddRow("5.7.25-TiDB-v5.0.0"), - ) mock.ExpectClose() return db, nil }