diff --git a/pkg/loader/util.go b/pkg/loader/util.go index 20ef3dd0f..661a4952b 100644 --- a/pkg/loader/util.go +++ b/pkg/loader/util.go @@ -25,6 +25,8 @@ import ( "github.com/go-sql-driver/mysql" "github.com/pingcap/errors" + "github.com/pingcap/tidb-binlog/pkg/sql" + "github.com/pingcap/tidb/errno" ) var ( @@ -83,6 +85,56 @@ func getTableInfo(db *gosql.DB, schema string, table string) (info *tableInfo, e var customID int64 +func isUnknownSystemVariableErr(err error) bool { + code, ok := sql.GetSQLErrCode(err) + if !ok { + return strings.Contains(err.Error(), "Unknown system variable") + } + + return code == errno.ErrUnknownSystemVariable +} + +func createDBWitSessions(dsn string) (db *gosql.DB, err error) { + // Try set this sessions if it's supported. + params := map[string]string{ + // After https://github.com/pingcap/tidb/pull/17102 + // default is false, must enable for insert value explicit, or can't replicate. + "allow_auto_random_explicit_insert": "1", + } + + var tryDB *gosql.DB + tryDB, err = gosql.Open("mysql", dsn) + if err != nil { + return nil, errors.Trace(err) + } + defer tryDB.Close() + + support := make(map[string]string) + for k, v := range params { + s := fmt.Sprintf("SET SESSION %s = ?", k) + _, err := tryDB.Exec(s, v) + if err != nil { + if isUnknownSystemVariableErr(err) { + continue + } + return nil, errors.Trace(err) + } + + support[k] = v + } + + for k, v := range support { + dsn += fmt.Sprintf("&%s=%s", k, url.QueryEscape(v)) + } + + db, err = gosql.Open("mysql", dsn) + if err != nil { + return nil, errors.Trace(err) + } + + return +} + // CreateDBWithSQLMode return sql.DB func CreateDBWithSQLMode(user string, password string, host string, port int, tlsConfig *tls.Config, sqlMode *string) (db *gosql.DB, err error) { dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4,utf8&interpolateParams=true&readTimeout=1m&multiStatements=true", user, password, host, port) @@ -100,11 +152,7 @@ func CreateDBWithSQLMode(user string, password string, host string, port int, tl dsn += "&tls=" + name } - db, err = gosql.Open("mysql", dsn) - if err != nil { - return nil, errors.Trace(err) - } - return + return createDBWitSessions(dsn) } // CreateDB return sql.DB diff --git a/tests/dailytest/case.go b/tests/dailytest/case.go index 9de1e64d9..605d3a4c2 100644 --- a/tests/dailytest/case.go +++ b/tests/dailytest/case.go @@ -27,14 +27,14 @@ import ( ) // https://pingcap.com/docs-cn/dev/reference/sql/attributes/auto-random/ -// var caseAutoRandom = []string{ -// "create table t (a bigint primary key auto_random, b varchar(255))", -// "insert into t(b) values('11')", -// } +var caseAutoRandom = []string{ + "create table t (a bigint primary key auto_random, b varchar(255))", + "insert into t(b) values('11')", +} -// var caseAutoRandomClean = []string{ -// "drop table t", -// } +var caseAutoRandomClean = []string{ + "drop table t", +} // test different data type of mysql // mysql will change boolean to tinybit(1) @@ -213,9 +213,8 @@ func RunCase(src *sql.DB, dst *sql.DB, schema string) { tr.run(caseUpdateWhileAddingCol) tr.execSQLs([]string{"DROP TABLE growing_cols;"}) - // [2020-05-29T04:00:51.258Z] [2020/05/29 11:58:09.889 +08:00] [ERROR] [executor.go:111] ["Exec fail, will rollback"] [query="INSERT INTO `test`.`t`(`a`,`b`) VALUES(?,?)"] [args="[6629298651489370113,\"11\"]"] [error="Error 8216: Invalid auto random: Explicit insertion on auto_random column is disabled. Try to set @@allow_auto_random_explicit_insert = true."] - // tr.execSQLs(caseAutoRandom) - // tr.execSQLs(caseAutoRandomClean) + tr.execSQLs(caseAutoRandom) + tr.execSQLs(caseAutoRandomClean) tr.execSQLs(caseMultiDataType) tr.execSQLs(caseMultiDataTypeClean) diff --git a/tests/kafka/kafka.go b/tests/kafka/kafka.go index d765ba62b..0b6904745 100644 --- a/tests/kafka/kafka.go +++ b/tests/kafka/kafka.go @@ -58,7 +58,7 @@ func main() { panic(err) } - sinkDB, err := util.CreateSinkDB() + sinkDB, err := loader.CreateDB("root", "", "127.0.0.1", 3306, nil) if err != nil { panic(err) }