Skip to content

Commit

Permalink
ddl: migrate part of ddl package code from Execute/ExecRestricted to …
Browse files Browse the repository at this point in the history
…safe API (1) (pingcap#22670)

Signed-off-by: AilinKid <314806019@qq.com>
  • Loading branch information
AilinKid committed Mar 3, 2021
1 parent b12aa4a commit ea10d52
Showing 1 changed file with 35 additions and 20 deletions.
55 changes: 35 additions & 20 deletions ddl/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
package util

import (
"context"
"encoding/hex"
"fmt"

"github.com/pingcap/errors"
"github.com/pingcap/parser/terror"
Expand All @@ -24,17 +24,17 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/sqlexec"
"golang.org/x/net/context"
)

const (
deleteRangesTable = `gc_delete_range`
doneDeleteRangesTable = `gc_delete_range_done`
loadDeleteRangeSQL = `SELECT HIGH_PRIORITY job_id, element_id, start_key, end_key FROM mysql.%s WHERE ts < %v`
recordDoneDeletedRangeSQL = `INSERT IGNORE INTO mysql.gc_delete_range_done SELECT * FROM mysql.gc_delete_range WHERE job_id = %d AND element_id = %d`
completeDeleteRangeSQL = `DELETE FROM mysql.gc_delete_range WHERE job_id = %d AND element_id = %d`
updateDeleteRangeSQL = `UPDATE mysql.gc_delete_range SET start_key = "%s" WHERE job_id = %d AND element_id = %d AND start_key = "%s"`
deleteDoneRecordSQL = `DELETE FROM mysql.gc_delete_range_done WHERE job_id = %d AND element_id = %d`
loadDeleteRangeSQL = `SELECT HIGH_PRIORITY job_id, element_id, start_key, end_key FROM mysql.%n WHERE ts < %?`
recordDoneDeletedRangeSQL = `INSERT IGNORE INTO mysql.gc_delete_range_done SELECT * FROM mysql.gc_delete_range WHERE job_id = %? AND element_id = %?`
completeDeleteRangeSQL = `DELETE FROM mysql.gc_delete_range WHERE job_id = %? AND element_id = %?`
updateDeleteRangeSQL = `UPDATE mysql.gc_delete_range SET start_key = %? WHERE job_id = %? AND element_id = %? AND start_key = %?`
deleteDoneRecordSQL = `DELETE FROM mysql.gc_delete_range_done WHERE job_id = %? AND element_id = %?`
loadDDLReorgVarsSQL = `select HIGH_PRIORITY variable_name, variable_value from mysql.global_variables where variable_name in (%?, %?)`
)

// DelRangeTask is for run delete-range command in gc_worker.
Expand All @@ -59,7 +59,10 @@ func LoadDoneDeleteRanges(ctx sessionctx.Context, safePoint uint64) (ranges []De
}

func loadDeleteRangesFromTable(ctx sessionctx.Context, table string, safePoint uint64) (ranges []DelRangeTask, _ error) {
sql := fmt.Sprintf(loadDeleteRangeSQL, table, safePoint)
sql, err := sqlexec.EscapeSQL(loadDeleteRangeSQL, table, safePoint)
if err != nil {
return nil, err
}
rss, err := ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql)
if len(rss) > 0 {
defer terror.Call(rss[0].Close)
Expand Down Expand Up @@ -103,41 +106,53 @@ func loadDeleteRangesFromTable(ctx sessionctx.Context, table string, safePoint u
// CompleteDeleteRange moves a record from gc_delete_range table to gc_delete_range_done table.
// NOTE: This function WILL NOT start and run in a new transaction internally.
func CompleteDeleteRange(ctx sessionctx.Context, dr DelRangeTask) error {
sql := fmt.Sprintf(recordDoneDeletedRangeSQL, dr.JobID, dr.ElementID)
_, err := ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql)
sql, err := sqlexec.EscapeSQL(recordDoneDeletedRangeSQL, dr.JobID, dr.ElementID)
if err != nil {
return err
}
_, err = ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql)
if err != nil {
return errors.Trace(err)
}

sql = fmt.Sprintf(completeDeleteRangeSQL, dr.JobID, dr.ElementID)
sql, err = sqlexec.EscapeSQL(completeDeleteRangeSQL, dr.JobID, dr.ElementID)
if err != nil {
return err
}
_, err = ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql)
return errors.Trace(err)
}

// DeleteDoneRecord removes a record from gc_delete_range_done table.
func DeleteDoneRecord(ctx sessionctx.Context, dr DelRangeTask) error {
sql := fmt.Sprintf(deleteDoneRecordSQL, dr.JobID, dr.ElementID)
_, err := ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql)
sql, err := sqlexec.EscapeSQL(deleteDoneRecordSQL, dr.JobID, dr.ElementID)
if err != nil {
return err
}
_, err = ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql)
return errors.Trace(err)
}

// UpdateDeleteRange is only for emulator.
func UpdateDeleteRange(ctx sessionctx.Context, dr DelRangeTask, newStartKey, oldStartKey kv.Key) error {
newStartKeyHex := hex.EncodeToString(newStartKey)
oldStartKeyHex := hex.EncodeToString(oldStartKey)
sql := fmt.Sprintf(updateDeleteRangeSQL, newStartKeyHex, dr.JobID, dr.ElementID, oldStartKeyHex)
_, err := ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql)
sql, err := sqlexec.EscapeSQL(updateDeleteRangeSQL, newStartKeyHex, dr.JobID, dr.ElementID, oldStartKeyHex)
if err != nil {
return err
}
_, err = ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql)
return errors.Trace(err)
}

const loadDDLReorgVarsSQL = "select HIGH_PRIORITY variable_name, variable_value from mysql.global_variables where variable_name in ('" +
variable.TiDBDDLReorgWorkerCount + "', '" +
variable.TiDBDDLReorgBatchSize + "')"

// LoadDDLReorgVars loads ddl reorg variable from mysql.global_variables.
func LoadDDLReorgVars(ctx sessionctx.Context) error {
if sctx, ok := ctx.(sqlexec.RestrictedSQLExecutor); ok {
rows, _, err := sctx.ExecRestrictedSQL(ctx, loadDDLReorgVarsSQL)
sql, err := sqlexec.EscapeSQL(loadDDLReorgVarsSQL, variable.TiDBDDLReorgWorkerCount, variable.TiDBDDLReorgBatchSize)
if err != nil {
return err
}
rows, _, err := sctx.ExecRestrictedSQL(ctx, sql)
if err != nil {
return errors.Trace(err)
}
Expand Down

0 comments on commit ea10d52

Please sign in to comment.