From 017c69c964ed08be805b8d01c4b5e5de2b443e41 Mon Sep 17 00:00:00 2001 From: AilinKid <314806019@qq.com> Date: Tue, 2 Feb 2021 17:26:14 +0800 Subject: [PATCH 1/8] migrate part of ddl package code from Execute/ExecRestricted to safe API Signed-off-by: AilinKid <314806019@qq.com> --- ddl/column.go | 23 +++++++++++++++---- ddl/db_integration_test.go | 2 +- ddl/db_partition_test.go | 6 ++--- ddl/db_test.go | 20 ++++++++-------- ddl/ddl_worker_test.go | 2 +- ddl/failtest/fail_db_test.go | 2 +- ddl/testutil/testutil.go | 4 ++-- ddl/util/util.go | 44 ++++++++++++++++-------------------- 8 files changed, 56 insertions(+), 47 deletions(-) diff --git a/ddl/column.go b/ddl/column.go index cdcf0443f66cf..5aa4d5dfdeca2 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -939,7 +939,12 @@ func (w *worker) doModifyColumnTypeWithData( } defer w.sessPool.put(ctx) - _, _, err = ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(valStr) + stmt, err := ctx.(sqlexec.RestrictedSQLExecutor).ParseWithParams(context.Background(), valStr) + if err != nil { + job.State = model.JobStateCancelled + failpoint.Return(ver, err) + } + _, _, err = ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedStmt(context.Background(), stmt) if err != nil { job.State = model.JobStateCancelled failpoint.Return(ver, err) @@ -1526,15 +1531,23 @@ func checkAndApplyNewAutoRandomBits(job *model.Job, t *meta.Meta, tblInfo *model // `isDataTruncated` indicates whether the new field and the old field type are the same, in order to be compatible with mysql. func checkForNullValue(ctx sessionctx.Context, isDataTruncated bool, schema, table, newCol model.CIStr, oldCols ...*model.ColumnInfo) error { colsStr := "" + paramsList := make([]interface{}, 0, 2+len(oldCols)) + paramsList = append(paramsList, schema.L, table.L) for i, col := range oldCols { if i == 0 { - colsStr += "`" + col.Name.L + "` is null" + colsStr += "%n is null" + paramsList = append(paramsList, col.Name.L) } else { - colsStr += " or `" + col.Name.L + "` is null" + colsStr += " or %n is null" + paramsList = append(paramsList, col.Name.L) } } - sql := fmt.Sprintf("select 1 from `%s`.`%s` where %s limit 1;", schema.L, table.L, colsStr) - rows, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(sql) + sql := "select 1 from %n.%n where " + colsStr + " limit 1;" + stmt, err := ctx.(sqlexec.RestrictedSQLExecutor).ParseWithParams(context.Background(), sql, paramsList...) + if err != nil { + return errors.Trace(err) + } + rows, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedStmt(context.Background(), stmt) if err != nil { return errors.Trace(err) } diff --git a/ddl/db_integration_test.go b/ddl/db_integration_test.go index 96490aa8c719d..ee7d4f96eeca7 100644 --- a/ddl/db_integration_test.go +++ b/ddl/db_integration_test.go @@ -87,7 +87,7 @@ func setupIntegrationSuite(s *testIntegrationSuite, c *C) { se, err := session.CreateSession4Test(s.store) c.Assert(err, IsNil) s.ctx = se.(sessionctx.Context) - _, err = se.Execute(context.Background(), "create database test_db") + _, err = se.ExecuteInternal(context.Background(), "create database test_db") c.Assert(err, IsNil) } diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index 2471c4c4593e1..cf82551e5e5f7 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -3185,7 +3185,7 @@ func (s *testIntegrationSuite7) TestCommitWhenSchemaChange(c *C) { defer func() { atomic.StoreUint32(&session.SchemaChangedWithoutRetry, 0) }() - _, err := tk.Se.Execute(context.Background(), "commit") + _, err := tk.Se.ExecuteInternal(context.Background(), "commit") c.Assert(domain.ErrInfoSchemaChanged.Equal(err), IsTrue) // Cover a bug that schema validator does not prevent transaction commit when @@ -3203,7 +3203,7 @@ func (s *testIntegrationSuite7) TestCommitWhenSchemaChange(c *C) { tk.MustExec("insert into nt values (1), (3), (5);") tk2.MustExec("alter table pt exchange partition p1 with table nt;") tk.MustExec("insert into nt values (7), (9);") - _, err = tk.Se.Execute(context.Background(), "commit") + _, err = tk.Se.ExecuteInternal(context.Background(), "commit") c.Assert(domain.ErrInfoSchemaChanged.Equal(err), IsTrue) tk.MustExec("admin check table pt") @@ -3215,7 +3215,7 @@ func (s *testIntegrationSuite7) TestCommitWhenSchemaChange(c *C) { tk.MustExec("insert into pt values (1), (3), (5);") tk2.MustExec("alter table pt exchange partition p1 with table nt;") tk.MustExec("insert into pt values (7), (9);") - _, err = tk.Se.Execute(context.Background(), "commit") + _, err = tk.Se.ExecuteInternal(context.Background(), "commit") c.Assert(domain.ErrInfoSchemaChanged.Equal(err), IsTrue) tk.MustExec("admin check table pt") diff --git a/ddl/db_test.go b/ddl/db_test.go index de12d9d70a5f5..f28c7d1e94ece 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -115,13 +115,13 @@ func setUpSuite(s *testDBSuite, c *C) { s.s, err = session.CreateSession4Test(s.store) c.Assert(err, IsNil) - _, err = s.s.Execute(context.Background(), "create database test_db") + _, err = s.s.ExecuteInternal(context.Background(), "create database test_db") c.Assert(err, IsNil) - s.s.Execute(context.Background(), "set @@global.tidb_max_delta_schema_count= 4096") + s.s.ExecuteInternal(context.Background(), "set @@global.tidb_max_delta_schema_count= 4096") } func tearDownSuite(s *testDBSuite, c *C) { - s.s.Execute(context.Background(), "drop database if exists test_db") + s.s.ExecuteInternal(context.Background(), "drop database if exists test_db") s.s.Close() s.dom.Close() s.store.Close() @@ -277,12 +277,12 @@ func backgroundExec(s kv.Storage, sql string, done chan error) { return } defer se.Close() - _, err = se.Execute(context.Background(), "use test_db") + _, err = se.ExecuteInternal(context.Background(), "use test_db") if err != nil { done <- errors.Trace(err) return } - _, err = se.Execute(context.Background(), sql) + _, err = se.ExecuteInternal(context.Background(), sql) done <- errors.Trace(err) } @@ -2048,9 +2048,9 @@ func (s *testDBSuite1) TestColumn(c *C) { func sessionExec(c *C, s kv.Storage, sql string) { se, err := session.CreateSession4Test(s) c.Assert(err, IsNil) - _, err = se.Execute(context.Background(), "use test_db") + _, err = se.ExecuteInternal(context.Background(), "use test_db") c.Assert(err, IsNil) - rs, err := se.Execute(context.Background(), sql) + rs, err := se.ExecuteInternal(context.Background(), sql) c.Assert(err, IsNil, Commentf("err:%v", errors.ErrorStack(err))) c.Assert(rs, IsNil) se.Close() @@ -2420,7 +2420,7 @@ func (s *testDBSuite5) TestRenameColumn(c *C) { } func (s *testDBSuite7) TestSelectInViewFromAnotherDB(c *C) { - _, _ = s.s.Execute(context.Background(), "create database test_db2") + _, _ = s.s.ExecuteInternal(context.Background(), "create database test_db2") tk := testkit.NewTestKit(c, s.store) tk.MustExec("use " + s.schemaName) tk.MustExec("create table t(a int)") @@ -5770,12 +5770,12 @@ func (s *testDBSuite4) testParallelExecSQL(c *C, sql1, sql2 string, se1, se2 ses }() go func() { defer wg.Done() - _, err1 = se1.Execute(context.Background(), sql1) + _, err1 = se1.ExecuteInternal(context.Background(), sql1) }() go func() { defer wg.Done() <-ch - _, err2 = se2.Execute(context.Background(), sql2) + _, err2 = se2.ExecuteInternal(context.Background(), sql2) }() wg.Wait() diff --git a/ddl/ddl_worker_test.go b/ddl/ddl_worker_test.go index 7c74ae1ec1dcf..78782ab654c18 100644 --- a/ddl/ddl_worker_test.go +++ b/ddl/ddl_worker_test.go @@ -1446,5 +1446,5 @@ func (s *testDDLSuite) TestDDLPackageExecuteSQL(c *C) { c.Assert(err, IsNil) defer worker.sessPool.put(sess) se := sess.(sqlexec.SQLExecutor) - _, _ = se.Execute(context.Background(), "create table t(a int);") + _, _ = se.ExecuteInternal(context.Background(), "create table t(a int);") } diff --git a/ddl/failtest/fail_db_test.go b/ddl/failtest/fail_db_test.go index 1d393d0549962..2740466e9fcc1 100644 --- a/ddl/failtest/fail_db_test.go +++ b/ddl/failtest/fail_db_test.go @@ -89,7 +89,7 @@ func (s *testFailDBSuite) SetUpSuite(c *C) { } func (s *testFailDBSuite) TearDownSuite(c *C) { - _, err := s.se.Execute(context.Background(), "drop database if exists test_db_state") + _, err := s.se.ExecuteInternal(context.Background(), "drop database if exists test_db_state") c.Assert(err, IsNil) s.se.Close() s.dom.Close() diff --git a/ddl/testutil/testutil.go b/ddl/testutil/testutil.go index 875528acd3e39..b7e0d381b3138 100644 --- a/ddl/testutil/testutil.go +++ b/ddl/testutil/testutil.go @@ -40,13 +40,13 @@ func ExecMultiSQLInGoroutine(c *check.C, s kv.Storage, dbName string, multiSQL [ return } defer se.Close() - _, err = se.Execute(context.Background(), "use "+dbName) + _, err = se.ExecuteInternal(context.Background(), "use %n", dbName) if err != nil { done <- errors.Trace(err) return } for _, sql := range multiSQL { - rs, err := se.Execute(context.Background(), sql) + rs, err := se.ExecuteInternal(context.Background(), sql) if err != nil { done <- errors.Trace(err) return diff --git a/ddl/util/util.go b/ddl/util/util.go index 295376945f7c6..38ec535dc7de7 100644 --- a/ddl/util/util.go +++ b/ddl/util/util.go @@ -32,12 +32,12 @@ import ( const ( deleteRangesTable = `gc_delete_range` doneDeleteRangesTable = `gc_delete_range_done` - loadDeleteRangeSQL = `SELECT HIGH_PRIORITY job_id, element_id, start_key, end_key FROM mysql.%s WHERE ts < %v` - recordDoneDeletedRangeSQL = `INSERT IGNORE INTO mysql.gc_delete_range_done SELECT * FROM mysql.gc_delete_range WHERE job_id = %d AND element_id = %d` - completeDeleteRangeSQL = `DELETE FROM mysql.gc_delete_range WHERE job_id = %d AND element_id = %d` - completeDeleteMultiRangesSQL = `DELETE FROM mysql.gc_delete_range WHERE job_id = %d AND element_id in (%v)` - updateDeleteRangeSQL = `UPDATE mysql.gc_delete_range SET start_key = "%s" WHERE job_id = %d AND element_id = %d AND start_key = "%s"` - deleteDoneRecordSQL = `DELETE FROM mysql.gc_delete_range_done WHERE job_id = %d AND element_id = %d` + loadDeleteRangeSQL = `SELECT HIGH_PRIORITY job_id, element_id, start_key, end_key FROM mysql.%n WHERE ts < %?` + recordDoneDeletedRangeSQL = `INSERT IGNORE INTO mysql.gc_delete_range_done SELECT * FROM mysql.gc_delete_range WHERE job_id = %? AND element_id = %?` + completeDeleteRangeSQL = `DELETE FROM mysql.gc_delete_range WHERE job_id = %? AND element_id = %?` + completeDeleteMultiRangesSQL = `DELETE FROM mysql.gc_delete_range WHERE job_id = %? AND element_id in (%?)` + updateDeleteRangeSQL = `UPDATE mysql.gc_delete_range SET start_key = "%?" WHERE job_id = %? AND element_id = %? AND start_key = "%?"` + deleteDoneRecordSQL = `DELETE FROM mysql.gc_delete_range_done WHERE job_id = %? AND element_id = %?` ) // DelRangeTask is for run delete-range command in gc_worker. @@ -62,16 +62,14 @@ func LoadDoneDeleteRanges(ctx sessionctx.Context, safePoint uint64) (ranges []De } func loadDeleteRangesFromTable(ctx sessionctx.Context, table string, safePoint uint64) (ranges []DelRangeTask, _ error) { - sql := fmt.Sprintf(loadDeleteRangeSQL, table, safePoint) - rss, err := ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) - if len(rss) > 0 { - defer terror.Call(rss[0].Close) + rs, err := ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), loadDeleteRangeSQL, table, safePoint) + if rs != nil { + defer terror.Call(rs.Close) } if err != nil { return nil, errors.Trace(err) } - rs := rss[0] req := rs.NewChunk() it := chunk.NewIterator4Chunk(req) for { @@ -106,8 +104,7 @@ func loadDeleteRangesFromTable(ctx sessionctx.Context, table string, safePoint u // CompleteDeleteRange moves a record from gc_delete_range table to gc_delete_range_done table. // NOTE: This function WILL NOT start and run in a new transaction internally. func CompleteDeleteRange(ctx sessionctx.Context, dr DelRangeTask) error { - sql := fmt.Sprintf(recordDoneDeletedRangeSQL, dr.JobID, dr.ElementID) - _, err := ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) + _, err := ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), recordDoneDeletedRangeSQL, dr.JobID, dr.ElementID) if err != nil { return errors.Trace(err) } @@ -117,8 +114,7 @@ func CompleteDeleteRange(ctx sessionctx.Context, dr DelRangeTask) error { // RemoveFromGCDeleteRange is exported for ddl pkg to use. func RemoveFromGCDeleteRange(ctx sessionctx.Context, jobID, elementID int64) error { - sql := fmt.Sprintf(completeDeleteRangeSQL, jobID, elementID) - _, err := ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) + _, err := ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), completeDeleteRangeSQL, jobID, elementID) return errors.Trace(err) } @@ -131,15 +127,13 @@ func RemoveMultiFromGCDeleteRange(ctx sessionctx.Context, jobID int64, elementID } buf.WriteString(strconv.FormatInt(elementID, 10)) } - sql := fmt.Sprintf(completeDeleteMultiRangesSQL, jobID, buf.String()) - _, err := ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) + _, err := ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), completeDeleteMultiRangesSQL, jobID, buf.String()) return errors.Trace(err) } // DeleteDoneRecord removes a record from gc_delete_range_done table. func DeleteDoneRecord(ctx sessionctx.Context, dr DelRangeTask) error { - sql := fmt.Sprintf(deleteDoneRecordSQL, dr.JobID, dr.ElementID) - _, err := ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) + _, err := ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), deleteDoneRecordSQL, dr.JobID, dr.ElementID) return errors.Trace(err) } @@ -147,8 +141,7 @@ func DeleteDoneRecord(ctx sessionctx.Context, dr DelRangeTask) error { func UpdateDeleteRange(ctx sessionctx.Context, dr DelRangeTask, newStartKey, oldStartKey kv.Key) error { newStartKeyHex := hex.EncodeToString(newStartKey) oldStartKeyHex := hex.EncodeToString(oldStartKey) - sql := fmt.Sprintf(updateDeleteRangeSQL, newStartKeyHex, dr.JobID, dr.ElementID, oldStartKeyHex) - _, err := ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) + _, err := ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), updateDeleteRangeSQL, newStartKeyHex, dr.JobID, dr.ElementID, oldStartKeyHex) return errors.Trace(err) } @@ -164,7 +157,7 @@ func LoadDDLVars(ctx sessionctx.Context) error { return LoadGlobalVars(ctx, []string{variable.TiDBDDLErrorCountLimit}) } -const loadGlobalVarsSQL = "select HIGH_PRIORITY variable_name, variable_value from mysql.global_variables where variable_name in (%s)" +const loadGlobalVarsSQL = "select HIGH_PRIORITY variable_name, variable_value from mysql.global_variables where variable_name in (%?)" // LoadGlobalVars loads global variable from mysql.global_variables. func LoadGlobalVars(ctx sessionctx.Context, varNames []string) error { @@ -176,8 +169,11 @@ func LoadGlobalVars(ctx sessionctx.Context, varNames []string) error { } nameList += fmt.Sprintf("'%s'", name) } - sql := fmt.Sprintf(loadGlobalVarsSQL, nameList) - rows, _, err := sctx.ExecRestrictedSQL(sql) + stmt, err := sctx.ParseWithParams(context.Background(), loadGlobalVarsSQL, nameList) + if err != nil { + return errors.Trace(err) + } + rows, _, err := sctx.ExecRestrictedStmt(context.Background(), stmt) if err != nil { return errors.Trace(err) } From d768063e9a980ceac29abdcf28ed832b24452557 Mon Sep 17 00:00:00 2001 From: AilinKid <314806019@qq.com> Date: Wed, 3 Feb 2021 13:07:09 +0800 Subject: [PATCH 2/8] address comment Signed-off-by: AilinKid <314806019@qq.com> --- ddl/util/util.go | 45 ++++++++++++++++++++++++++------------------- 1 file changed, 26 insertions(+), 19 deletions(-) diff --git a/ddl/util/util.go b/ddl/util/util.go index 38ec535dc7de7..8d0a034eb7055 100644 --- a/ddl/util/util.go +++ b/ddl/util/util.go @@ -14,12 +14,8 @@ package util import ( - "bytes" "context" "encoding/hex" - "fmt" - "strconv" - "github.com/pingcap/errors" "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/kv" @@ -30,14 +26,15 @@ import ( ) const ( - deleteRangesTable = `gc_delete_range` - doneDeleteRangesTable = `gc_delete_range_done` - loadDeleteRangeSQL = `SELECT HIGH_PRIORITY job_id, element_id, start_key, end_key FROM mysql.%n WHERE ts < %?` - recordDoneDeletedRangeSQL = `INSERT IGNORE INTO mysql.gc_delete_range_done SELECT * FROM mysql.gc_delete_range WHERE job_id = %? AND element_id = %?` - completeDeleteRangeSQL = `DELETE FROM mysql.gc_delete_range WHERE job_id = %? AND element_id = %?` - completeDeleteMultiRangesSQL = `DELETE FROM mysql.gc_delete_range WHERE job_id = %? AND element_id in (%?)` - updateDeleteRangeSQL = `UPDATE mysql.gc_delete_range SET start_key = "%?" WHERE job_id = %? AND element_id = %? AND start_key = "%?"` - deleteDoneRecordSQL = `DELETE FROM mysql.gc_delete_range_done WHERE job_id = %? AND element_id = %?` + deleteRangesTable = `gc_delete_range` + doneDeleteRangesTable = `gc_delete_range_done` + loadDeleteRangeSQL = `SELECT HIGH_PRIORITY job_id, element_id, start_key, end_key FROM mysql.%n WHERE ts < %?` + recordDoneDeletedRangeSQL = `INSERT IGNORE INTO mysql.gc_delete_range_done SELECT * FROM mysql.gc_delete_range WHERE job_id = %? AND element_id = %?` + completeDeleteRangeSQL = `DELETE FROM mysql.gc_delete_range WHERE job_id = %? AND element_id = %?` + completeDeleteMultiRangesSQLPrefix = `DELETE FROM mysql.gc_delete_range WHERE job_id = %? AND element_id in (` + completeDeleteMultiRangesSQLSuffix = `)` + updateDeleteRangeSQL = `UPDATE mysql.gc_delete_range SET start_key = %? WHERE job_id = %? AND element_id = %? AND start_key = %?` + deleteDoneRecordSQL = `DELETE FROM mysql.gc_delete_range_done WHERE job_id = %? AND element_id = %?` ) // DelRangeTask is for run delete-range command in gc_worker. @@ -120,14 +117,18 @@ func RemoveFromGCDeleteRange(ctx sessionctx.Context, jobID, elementID int64) err // RemoveMultiFromGCDeleteRange is exported for ddl pkg to use. func RemoveMultiFromGCDeleteRange(ctx sessionctx.Context, jobID int64, elementIDs []int64) error { - var buf bytes.Buffer + idList := "" + paramIDs := make([]interface{}, 0, 1+len(elementIDs)) + paramIDs = append(paramIDs, jobID) for i, elementID := range elementIDs { if i > 0 { - buf.WriteString(", ") + idList += ", " } - buf.WriteString(strconv.FormatInt(elementID, 10)) + idList += "%?" + paramIDs = append(paramIDs, elementID) } - _, err := ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), completeDeleteMultiRangesSQL, jobID, buf.String()) + completeDeleteMultiRangesSQL := completeDeleteMultiRangesSQLPrefix + idList + completeDeleteMultiRangesSQLSuffix + _, err := ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), completeDeleteMultiRangesSQL, paramIDs...) return errors.Trace(err) } @@ -157,19 +158,25 @@ func LoadDDLVars(ctx sessionctx.Context) error { return LoadGlobalVars(ctx, []string{variable.TiDBDDLErrorCountLimit}) } -const loadGlobalVarsSQL = "select HIGH_PRIORITY variable_name, variable_value from mysql.global_variables where variable_name in (%?)" +const ( + loadGlobalVarsSQLPrefix = "select HIGH_PRIORITY variable_name, variable_value from mysql.global_variables where variable_name in (" + loadGlobalVarsSQLSuffix = ")" +) // LoadGlobalVars loads global variable from mysql.global_variables. func LoadGlobalVars(ctx sessionctx.Context, varNames []string) error { if sctx, ok := ctx.(sqlexec.RestrictedSQLExecutor); ok { nameList := "" + paramNames := make([]interface{}, 0, len(varNames)) for i, name := range varNames { if i > 0 { nameList += ", " } - nameList += fmt.Sprintf("'%s'", name) + nameList += "%?" + paramNames = append(paramNames, name) } - stmt, err := sctx.ParseWithParams(context.Background(), loadGlobalVarsSQL, nameList) + loadGlobalVarsSQL := loadGlobalVarsSQLPrefix + nameList + loadGlobalVarsSQLSuffix + stmt, err := sctx.ParseWithParams(context.Background(), loadGlobalVarsSQL, paramNames) if err != nil { return errors.Trace(err) } From ba73205e17dacc5c789d5a9e848cc7e2d92b7c05 Mon Sep 17 00:00:00 2001 From: AilinKid <314806019@qq.com> Date: Wed, 3 Feb 2021 13:15:31 +0800 Subject: [PATCH 3/8] make fmt Signed-off-by: AilinKid <314806019@qq.com> --- ddl/util/util.go | 1 + 1 file changed, 1 insertion(+) diff --git a/ddl/util/util.go b/ddl/util/util.go index 8d0a034eb7055..23f99157d6e46 100644 --- a/ddl/util/util.go +++ b/ddl/util/util.go @@ -16,6 +16,7 @@ package util import ( "context" "encoding/hex" + "github.com/pingcap/errors" "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/kv" From 29a6f6885d6f43e23046dd5211ef989e782429d5 Mon Sep 17 00:00:00 2001 From: AilinKid <314806019@qq.com> Date: Wed, 3 Feb 2021 13:37:10 +0800 Subject: [PATCH 4/8] . Signed-off-by: AilinKid <314806019@qq.com> --- ddl/util/util.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddl/util/util.go b/ddl/util/util.go index 23f99157d6e46..d64cb9b8a54eb 100644 --- a/ddl/util/util.go +++ b/ddl/util/util.go @@ -177,7 +177,7 @@ func LoadGlobalVars(ctx sessionctx.Context, varNames []string) error { paramNames = append(paramNames, name) } loadGlobalVarsSQL := loadGlobalVarsSQLPrefix + nameList + loadGlobalVarsSQLSuffix - stmt, err := sctx.ParseWithParams(context.Background(), loadGlobalVarsSQL, paramNames) + stmt, err := sctx.ParseWithParams(context.Background(), loadGlobalVarsSQL, paramNames...) if err != nil { return errors.Trace(err) } From fad310deb839066ebb4c1ea587e6243b357a9545 Mon Sep 17 00:00:00 2001 From: AilinKid <314806019@qq.com> Date: Wed, 3 Feb 2021 15:54:42 +0800 Subject: [PATCH 5/8] restore the old api in the test Signed-off-by: AilinKid <314806019@qq.com> --- ddl/db_integration_test.go | 2 +- ddl/db_partition_test.go | 6 +++--- ddl/db_test.go | 20 ++++++++++---------- ddl/ddl_worker_test.go | 2 +- ddl/failtest/fail_db_test.go | 2 +- 5 files changed, 16 insertions(+), 16 deletions(-) diff --git a/ddl/db_integration_test.go b/ddl/db_integration_test.go index ee7d4f96eeca7..96490aa8c719d 100644 --- a/ddl/db_integration_test.go +++ b/ddl/db_integration_test.go @@ -87,7 +87,7 @@ func setupIntegrationSuite(s *testIntegrationSuite, c *C) { se, err := session.CreateSession4Test(s.store) c.Assert(err, IsNil) s.ctx = se.(sessionctx.Context) - _, err = se.ExecuteInternal(context.Background(), "create database test_db") + _, err = se.Execute(context.Background(), "create database test_db") c.Assert(err, IsNil) } diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index cf82551e5e5f7..2471c4c4593e1 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -3185,7 +3185,7 @@ func (s *testIntegrationSuite7) TestCommitWhenSchemaChange(c *C) { defer func() { atomic.StoreUint32(&session.SchemaChangedWithoutRetry, 0) }() - _, err := tk.Se.ExecuteInternal(context.Background(), "commit") + _, err := tk.Se.Execute(context.Background(), "commit") c.Assert(domain.ErrInfoSchemaChanged.Equal(err), IsTrue) // Cover a bug that schema validator does not prevent transaction commit when @@ -3203,7 +3203,7 @@ func (s *testIntegrationSuite7) TestCommitWhenSchemaChange(c *C) { tk.MustExec("insert into nt values (1), (3), (5);") tk2.MustExec("alter table pt exchange partition p1 with table nt;") tk.MustExec("insert into nt values (7), (9);") - _, err = tk.Se.ExecuteInternal(context.Background(), "commit") + _, err = tk.Se.Execute(context.Background(), "commit") c.Assert(domain.ErrInfoSchemaChanged.Equal(err), IsTrue) tk.MustExec("admin check table pt") @@ -3215,7 +3215,7 @@ func (s *testIntegrationSuite7) TestCommitWhenSchemaChange(c *C) { tk.MustExec("insert into pt values (1), (3), (5);") tk2.MustExec("alter table pt exchange partition p1 with table nt;") tk.MustExec("insert into pt values (7), (9);") - _, err = tk.Se.ExecuteInternal(context.Background(), "commit") + _, err = tk.Se.Execute(context.Background(), "commit") c.Assert(domain.ErrInfoSchemaChanged.Equal(err), IsTrue) tk.MustExec("admin check table pt") diff --git a/ddl/db_test.go b/ddl/db_test.go index f28c7d1e94ece..de12d9d70a5f5 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -115,13 +115,13 @@ func setUpSuite(s *testDBSuite, c *C) { s.s, err = session.CreateSession4Test(s.store) c.Assert(err, IsNil) - _, err = s.s.ExecuteInternal(context.Background(), "create database test_db") + _, err = s.s.Execute(context.Background(), "create database test_db") c.Assert(err, IsNil) - s.s.ExecuteInternal(context.Background(), "set @@global.tidb_max_delta_schema_count= 4096") + s.s.Execute(context.Background(), "set @@global.tidb_max_delta_schema_count= 4096") } func tearDownSuite(s *testDBSuite, c *C) { - s.s.ExecuteInternal(context.Background(), "drop database if exists test_db") + s.s.Execute(context.Background(), "drop database if exists test_db") s.s.Close() s.dom.Close() s.store.Close() @@ -277,12 +277,12 @@ func backgroundExec(s kv.Storage, sql string, done chan error) { return } defer se.Close() - _, err = se.ExecuteInternal(context.Background(), "use test_db") + _, err = se.Execute(context.Background(), "use test_db") if err != nil { done <- errors.Trace(err) return } - _, err = se.ExecuteInternal(context.Background(), sql) + _, err = se.Execute(context.Background(), sql) done <- errors.Trace(err) } @@ -2048,9 +2048,9 @@ func (s *testDBSuite1) TestColumn(c *C) { func sessionExec(c *C, s kv.Storage, sql string) { se, err := session.CreateSession4Test(s) c.Assert(err, IsNil) - _, err = se.ExecuteInternal(context.Background(), "use test_db") + _, err = se.Execute(context.Background(), "use test_db") c.Assert(err, IsNil) - rs, err := se.ExecuteInternal(context.Background(), sql) + rs, err := se.Execute(context.Background(), sql) c.Assert(err, IsNil, Commentf("err:%v", errors.ErrorStack(err))) c.Assert(rs, IsNil) se.Close() @@ -2420,7 +2420,7 @@ func (s *testDBSuite5) TestRenameColumn(c *C) { } func (s *testDBSuite7) TestSelectInViewFromAnotherDB(c *C) { - _, _ = s.s.ExecuteInternal(context.Background(), "create database test_db2") + _, _ = s.s.Execute(context.Background(), "create database test_db2") tk := testkit.NewTestKit(c, s.store) tk.MustExec("use " + s.schemaName) tk.MustExec("create table t(a int)") @@ -5770,12 +5770,12 @@ func (s *testDBSuite4) testParallelExecSQL(c *C, sql1, sql2 string, se1, se2 ses }() go func() { defer wg.Done() - _, err1 = se1.ExecuteInternal(context.Background(), sql1) + _, err1 = se1.Execute(context.Background(), sql1) }() go func() { defer wg.Done() <-ch - _, err2 = se2.ExecuteInternal(context.Background(), sql2) + _, err2 = se2.Execute(context.Background(), sql2) }() wg.Wait() diff --git a/ddl/ddl_worker_test.go b/ddl/ddl_worker_test.go index 78782ab654c18..7c74ae1ec1dcf 100644 --- a/ddl/ddl_worker_test.go +++ b/ddl/ddl_worker_test.go @@ -1446,5 +1446,5 @@ func (s *testDDLSuite) TestDDLPackageExecuteSQL(c *C) { c.Assert(err, IsNil) defer worker.sessPool.put(sess) se := sess.(sqlexec.SQLExecutor) - _, _ = se.ExecuteInternal(context.Background(), "create table t(a int);") + _, _ = se.Execute(context.Background(), "create table t(a int);") } diff --git a/ddl/failtest/fail_db_test.go b/ddl/failtest/fail_db_test.go index 2740466e9fcc1..1d393d0549962 100644 --- a/ddl/failtest/fail_db_test.go +++ b/ddl/failtest/fail_db_test.go @@ -89,7 +89,7 @@ func (s *testFailDBSuite) SetUpSuite(c *C) { } func (s *testFailDBSuite) TearDownSuite(c *C) { - _, err := s.se.ExecuteInternal(context.Background(), "drop database if exists test_db_state") + _, err := s.se.Execute(context.Background(), "drop database if exists test_db_state") c.Assert(err, IsNil) s.se.Close() s.dom.Close() From 7a8f1ff876a0cd248633f7740078f723f708e1cc Mon Sep 17 00:00:00 2001 From: AilinKid <314806019@qq.com> Date: Wed, 3 Feb 2021 18:42:34 +0800 Subject: [PATCH 6/8] address comment Signed-off-by: AilinKid <314806019@qq.com> --- ddl/column.go | 8 ++++++-- ddl/util/util.go | 38 +++++++++++++++++++++++--------------- 2 files changed, 29 insertions(+), 17 deletions(-) diff --git a/ddl/column.go b/ddl/column.go index 5aa4d5dfdeca2..9a635e9057cfe 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -1542,8 +1542,12 @@ func checkForNullValue(ctx sessionctx.Context, isDataTruncated bool, schema, tab paramsList = append(paramsList, col.Name.L) } } - sql := "select 1 from %n.%n where " + colsStr + " limit 1;" - stmt, err := ctx.(sqlexec.RestrictedSQLExecutor).ParseWithParams(context.Background(), sql, paramsList...) + var buf strings.Builder + _, err := fmt.Fprintf(&buf, "select 1 from %%n.%%n where %s limit 1", colsStr) + if err != nil { + return errors.Trace(err) + } + stmt, err := ctx.(sqlexec.RestrictedSQLExecutor).ParseWithParams(context.Background(), buf.String(), paramsList...) if err != nil { return errors.Trace(err) } diff --git a/ddl/util/util.go b/ddl/util/util.go index d64cb9b8a54eb..1cb186342f9bb 100644 --- a/ddl/util/util.go +++ b/ddl/util/util.go @@ -16,6 +16,8 @@ package util import ( "context" "encoding/hex" + "fmt" + "strings" "github.com/pingcap/errors" "github.com/pingcap/parser/terror" @@ -27,15 +29,14 @@ import ( ) const ( - deleteRangesTable = `gc_delete_range` - doneDeleteRangesTable = `gc_delete_range_done` - loadDeleteRangeSQL = `SELECT HIGH_PRIORITY job_id, element_id, start_key, end_key FROM mysql.%n WHERE ts < %?` - recordDoneDeletedRangeSQL = `INSERT IGNORE INTO mysql.gc_delete_range_done SELECT * FROM mysql.gc_delete_range WHERE job_id = %? AND element_id = %?` - completeDeleteRangeSQL = `DELETE FROM mysql.gc_delete_range WHERE job_id = %? AND element_id = %?` - completeDeleteMultiRangesSQLPrefix = `DELETE FROM mysql.gc_delete_range WHERE job_id = %? AND element_id in (` - completeDeleteMultiRangesSQLSuffix = `)` - updateDeleteRangeSQL = `UPDATE mysql.gc_delete_range SET start_key = %? WHERE job_id = %? AND element_id = %? AND start_key = %?` - deleteDoneRecordSQL = `DELETE FROM mysql.gc_delete_range_done WHERE job_id = %? AND element_id = %?` + deleteRangesTable = `gc_delete_range` + doneDeleteRangesTable = `gc_delete_range_done` + loadDeleteRangeSQL = `SELECT HIGH_PRIORITY job_id, element_id, start_key, end_key FROM mysql.%n WHERE ts < %?` + recordDoneDeletedRangeSQL = `INSERT IGNORE INTO mysql.gc_delete_range_done SELECT * FROM mysql.gc_delete_range WHERE job_id = %? AND element_id = %?` + completeDeleteRangeSQL = `DELETE FROM mysql.gc_delete_range WHERE job_id = %? AND element_id = %?` + completeDeleteMultiRangesSQL = `DELETE FROM mysql.gc_delete_range WHERE job_id = %%? AND element_id in (%s)` + updateDeleteRangeSQL = `UPDATE mysql.gc_delete_range SET start_key = %? WHERE job_id = %? AND element_id = %? AND start_key = %?` + deleteDoneRecordSQL = `DELETE FROM mysql.gc_delete_range_done WHERE job_id = %? AND element_id = %?` ) // DelRangeTask is for run delete-range command in gc_worker. @@ -128,8 +129,12 @@ func RemoveMultiFromGCDeleteRange(ctx sessionctx.Context, jobID int64, elementID idList += "%?" paramIDs = append(paramIDs, elementID) } - completeDeleteMultiRangesSQL := completeDeleteMultiRangesSQLPrefix + idList + completeDeleteMultiRangesSQLSuffix - _, err := ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), completeDeleteMultiRangesSQL, paramIDs...) + var buf strings.Builder + _, err := fmt.Fprintf(&buf, completeDeleteMultiRangesSQL, idList) + if err != nil { + return errors.Trace(err) + } + _, err = ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), buf.String(), paramIDs...) return errors.Trace(err) } @@ -160,8 +165,7 @@ func LoadDDLVars(ctx sessionctx.Context) error { } const ( - loadGlobalVarsSQLPrefix = "select HIGH_PRIORITY variable_name, variable_value from mysql.global_variables where variable_name in (" - loadGlobalVarsSQLSuffix = ")" + loadGlobalVarsSQL = "select HIGH_PRIORITY variable_name, variable_value from mysql.global_variables where variable_name in (%s)" ) // LoadGlobalVars loads global variable from mysql.global_variables. @@ -176,8 +180,12 @@ func LoadGlobalVars(ctx sessionctx.Context, varNames []string) error { nameList += "%?" paramNames = append(paramNames, name) } - loadGlobalVarsSQL := loadGlobalVarsSQLPrefix + nameList + loadGlobalVarsSQLSuffix - stmt, err := sctx.ParseWithParams(context.Background(), loadGlobalVarsSQL, paramNames...) + var buf strings.Builder + _, err := fmt.Fprintf(&buf, loadGlobalVarsSQL, nameList) + if err != nil { + return errors.Trace(err) + } + stmt, err := sctx.ParseWithParams(context.Background(), buf.String(), paramNames...) if err != nil { return errors.Trace(err) } From f3b93809084065f18161a714d36b8553f7c82ae9 Mon Sep 17 00:00:00 2001 From: AilinKid <314806019@qq.com> Date: Thu, 4 Feb 2021 11:38:06 +0800 Subject: [PATCH 7/8] . Signed-off-by: AilinKid <314806019@qq.com> --- ddl/column.go | 13 +++++-------- ddl/util/util.go | 36 +++++++++++++----------------------- 2 files changed, 18 insertions(+), 31 deletions(-) diff --git a/ddl/column.go b/ddl/column.go index 9a635e9057cfe..8932957593a6d 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -1530,23 +1530,20 @@ func checkAndApplyNewAutoRandomBits(job *model.Job, t *meta.Meta, tblInfo *model // checkForNullValue ensure there are no null values of the column of this table. // `isDataTruncated` indicates whether the new field and the old field type are the same, in order to be compatible with mysql. func checkForNullValue(ctx sessionctx.Context, isDataTruncated bool, schema, table, newCol model.CIStr, oldCols ...*model.ColumnInfo) error { - colsStr := "" + var buf strings.Builder + buf.WriteString("select 1 from %n.%n where ") paramsList := make([]interface{}, 0, 2+len(oldCols)) paramsList = append(paramsList, schema.L, table.L) for i, col := range oldCols { if i == 0 { - colsStr += "%n is null" + buf.WriteString("%n is null") paramsList = append(paramsList, col.Name.L) } else { - colsStr += " or %n is null" + buf.WriteString(" or %n is null") paramsList = append(paramsList, col.Name.L) } } - var buf strings.Builder - _, err := fmt.Fprintf(&buf, "select 1 from %%n.%%n where %s limit 1", colsStr) - if err != nil { - return errors.Trace(err) - } + buf.WriteString(" limit 1") stmt, err := ctx.(sqlexec.RestrictedSQLExecutor).ParseWithParams(context.Background(), buf.String(), paramsList...) if err != nil { return errors.Trace(err) diff --git a/ddl/util/util.go b/ddl/util/util.go index 1cb186342f9bb..0edc0fd60c2ad 100644 --- a/ddl/util/util.go +++ b/ddl/util/util.go @@ -16,7 +16,6 @@ package util import ( "context" "encoding/hex" - "fmt" "strings" "github.com/pingcap/errors" @@ -34,9 +33,10 @@ const ( loadDeleteRangeSQL = `SELECT HIGH_PRIORITY job_id, element_id, start_key, end_key FROM mysql.%n WHERE ts < %?` recordDoneDeletedRangeSQL = `INSERT IGNORE INTO mysql.gc_delete_range_done SELECT * FROM mysql.gc_delete_range WHERE job_id = %? AND element_id = %?` completeDeleteRangeSQL = `DELETE FROM mysql.gc_delete_range WHERE job_id = %? AND element_id = %?` - completeDeleteMultiRangesSQL = `DELETE FROM mysql.gc_delete_range WHERE job_id = %%? AND element_id in (%s)` + completeDeleteMultiRangesSQL = `DELETE FROM mysql.gc_delete_range WHERE job_id = %? AND element_id in (` // + idList + ")" updateDeleteRangeSQL = `UPDATE mysql.gc_delete_range SET start_key = %? WHERE job_id = %? AND element_id = %? AND start_key = %?` deleteDoneRecordSQL = `DELETE FROM mysql.gc_delete_range_done WHERE job_id = %? AND element_id = %?` + loadGlobalVars = `SELECT HIGH_PRIORITY variable_name, variable_value from mysql.global_variables where variable_name in (` // + nameList + append ")" ) // DelRangeTask is for run delete-range command in gc_worker. @@ -119,22 +119,19 @@ func RemoveFromGCDeleteRange(ctx sessionctx.Context, jobID, elementID int64) err // RemoveMultiFromGCDeleteRange is exported for ddl pkg to use. func RemoveMultiFromGCDeleteRange(ctx sessionctx.Context, jobID int64, elementIDs []int64) error { - idList := "" + var buf strings.Builder + buf.WriteString(completeDeleteMultiRangesSQL) paramIDs := make([]interface{}, 0, 1+len(elementIDs)) paramIDs = append(paramIDs, jobID) for i, elementID := range elementIDs { if i > 0 { - idList += ", " + buf.WriteString(", ") } - idList += "%?" + buf.WriteString("%?") paramIDs = append(paramIDs, elementID) } - var buf strings.Builder - _, err := fmt.Fprintf(&buf, completeDeleteMultiRangesSQL, idList) - if err != nil { - return errors.Trace(err) - } - _, err = ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), buf.String(), paramIDs...) + buf.WriteString(")") + _, err := ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), buf.String(), paramIDs...) return errors.Trace(err) } @@ -164,27 +161,20 @@ func LoadDDLVars(ctx sessionctx.Context) error { return LoadGlobalVars(ctx, []string{variable.TiDBDDLErrorCountLimit}) } -const ( - loadGlobalVarsSQL = "select HIGH_PRIORITY variable_name, variable_value from mysql.global_variables where variable_name in (%s)" -) - // LoadGlobalVars loads global variable from mysql.global_variables. func LoadGlobalVars(ctx sessionctx.Context, varNames []string) error { if sctx, ok := ctx.(sqlexec.RestrictedSQLExecutor); ok { - nameList := "" + var buf strings.Builder + buf.WriteString(loadGlobalVars) paramNames := make([]interface{}, 0, len(varNames)) for i, name := range varNames { if i > 0 { - nameList += ", " + buf.WriteString(", ") } - nameList += "%?" + buf.WriteString("%?") paramNames = append(paramNames, name) } - var buf strings.Builder - _, err := fmt.Fprintf(&buf, loadGlobalVarsSQL, nameList) - if err != nil { - return errors.Trace(err) - } + buf.WriteString(")") stmt, err := sctx.ParseWithParams(context.Background(), buf.String(), paramNames...) if err != nil { return errors.Trace(err) From cef2347c8859776dd6adff33116c64cb6a60859d Mon Sep 17 00:00:00 2001 From: AilinKid <314806019@qq.com> Date: Thu, 4 Feb 2021 11:42:54 +0800 Subject: [PATCH 8/8] . Signed-off-by: AilinKid <314806019@qq.com> --- ddl/testutil/testutil.go | 4 ++-- ddl/util/util.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/ddl/testutil/testutil.go b/ddl/testutil/testutil.go index b7e0d381b3138..875528acd3e39 100644 --- a/ddl/testutil/testutil.go +++ b/ddl/testutil/testutil.go @@ -40,13 +40,13 @@ func ExecMultiSQLInGoroutine(c *check.C, s kv.Storage, dbName string, multiSQL [ return } defer se.Close() - _, err = se.ExecuteInternal(context.Background(), "use %n", dbName) + _, err = se.Execute(context.Background(), "use "+dbName) if err != nil { done <- errors.Trace(err) return } for _, sql := range multiSQL { - rs, err := se.ExecuteInternal(context.Background(), sql) + rs, err := se.Execute(context.Background(), sql) if err != nil { done <- errors.Trace(err) return diff --git a/ddl/util/util.go b/ddl/util/util.go index 0edc0fd60c2ad..0c3ec2608b9eb 100644 --- a/ddl/util/util.go +++ b/ddl/util/util.go @@ -36,7 +36,7 @@ const ( completeDeleteMultiRangesSQL = `DELETE FROM mysql.gc_delete_range WHERE job_id = %? AND element_id in (` // + idList + ")" updateDeleteRangeSQL = `UPDATE mysql.gc_delete_range SET start_key = %? WHERE job_id = %? AND element_id = %? AND start_key = %?` deleteDoneRecordSQL = `DELETE FROM mysql.gc_delete_range_done WHERE job_id = %? AND element_id = %?` - loadGlobalVars = `SELECT HIGH_PRIORITY variable_name, variable_value from mysql.global_variables where variable_name in (` // + nameList + append ")" + loadGlobalVars = `SELECT HIGH_PRIORITY variable_name, variable_value from mysql.global_variables where variable_name in (` // + nameList + ")" ) // DelRangeTask is for run delete-range command in gc_worker.