Skip to content

Commit

Permalink
add union key
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <ihusharp@gmail.com>
  • Loading branch information
HuSharp committed Aug 26, 2024
1 parent d041cd0 commit f7ca2d1
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 17 deletions.
24 changes: 11 additions & 13 deletions pkg/domain/resourcegroup/runaway.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,32 +61,29 @@ type RunawayRecord struct {
Match string
Action string
SQLText string
SQLDigest string
PlanDigest string
Source string
}

// GenRunawayQueriesStmt generates statement with given RunawayRecords.
func GenRunawayQueriesStmt(records []*RunawayRecord) (string, []any) {
var builder strings.Builder
params := make([]any, 0, len(records)*7)
builder.WriteString("insert into mysql.tidb_runaway_queries VALUES ")
params := make([]any, 0, len(records)*8)
builder.WriteString("INSERT INTO mysql.tidb_runaway_queries " +
"(resource_group_name, time, match_type, action, original_sql, sql_digest, plan_digest, tidb_server) VALUES ")
for count, r := range records {
if count > 0 {
builder.WriteByte(',')
}
builder.WriteString("(%?, %?, %?, %?, %?, %?, %?)")
params = append(params, r.ResourceGroupName)
params = append(params, r.Time)
params = append(params, r.Match)
params = append(params, r.Action)
params = append(params, r.SQLText)
params = append(params, r.PlanDigest)
params = append(params, r.Source)
builder.WriteString("(%?, %?, %?, %?, %?, %?, %?, %?)")
params = append(params, r.ResourceGroupName, r.Time, r.Match, r.Action, r.SQLText, r.SQLDigest, r.PlanDigest, r.Source)
}
builder.WriteString(" ON DUPLICATE KEY UPDATE time = VALUES(time), repeats = repeats + 1 ")
return builder.String(), params
}

// QuarantineRecord is used to save records which will be insert into mysql.tidb_runaway_watch.
// QuarantineRecord is used to save records which will be inserted into mysql.tidb_runaway_watch.
type QuarantineRecord struct {
ID int64
ResourceGroupName string
Expand Down Expand Up @@ -379,7 +376,7 @@ func (rm *RunawayManager) getWatchFromWatchList(key string) *QuarantineRecord {
return nil
}

func (rm *RunawayManager) markRunaway(resourceGroupName, originalSQL, planDigest, action, matchType string, now *time.Time) {
func (rm *RunawayManager) markRunaway(resourceGroupName, originalSQL, sqlDigest, planDigest, action, matchType string, now *time.Time) {
source := rm.serverID
if !rm.syncerInitialized.Load() {
rm.logOnce.Do(func() {
Expand All @@ -394,6 +391,7 @@ func (rm *RunawayManager) markRunaway(resourceGroupName, originalSQL, planDigest
Match: matchType,
Action: action,
SQLText: originalSQL,
SQLDigest: sqlDigest,
PlanDigest: planDigest,
Source: source,
}:
Expand Down Expand Up @@ -647,7 +645,7 @@ func (r *RunawayChecker) markRunawayByWatch(action rmpb.RunawayAction) {
func (r *RunawayChecker) markRunaway(matchType string, action rmpb.RunawayAction, now *time.Time) {
actionStr := strings.ToLower(action.String())
metrics.RunawayCheckerCounter.WithLabelValues(r.resourceGroupName, matchType, actionStr).Inc()
r.manager.markRunaway(r.resourceGroupName, r.originalSQL, r.planDigest, actionStr, matchType, now)
r.manager.markRunaway(r.resourceGroupName, r.originalSQL, r.sqlDigest, r.planDigest, actionStr, matchType, now)
}

func (r *RunawayChecker) getSettingConvictIdentifier() string {
Expand Down
2 changes: 1 addition & 1 deletion pkg/domain/runaway.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import (
)

const (
runawayRecordFlushInterval = time.Second
runawayRecordFlushInterval = 10 * time.Second
runawayRecordGCInterval = time.Hour * 24
runawayRecordExpiredDuration = time.Hour * 24 * 7
runawayWatchSyncInterval = time.Second
Expand Down
23 changes: 20 additions & 3 deletions pkg/session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,13 +622,16 @@ const (
CreateRunawayTable = `CREATE TABLE IF NOT EXISTS mysql.tidb_runaway_queries (
resource_group_name varchar(32) not null,
time TIMESTAMP NOT NULL,
repeats int default 0,
match_type varchar(12) NOT NULL,
action varchar(12) NOT NULL,
original_sql TEXT NOT NULL,
plan_digest TEXT NOT NULL,
sql_digest varchar(64) NOT NULL,
plan_digest varchar(64) NOT NULL,
tidb_server varchar(512),
INDEX plan_index(plan_digest(64)) COMMENT "accelerate the speed when select runaway query",
INDEX time_index(time) COMMENT "accelerate the speed when querying with active watch"
INDEX time_index(time) COMMENT "accelerate the speed when querying with active watch",
UNIQUE KEY runaway_task(resource_group_name, sql_digest, plan_digest)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;`

// CreateRunawayWatchTable stores the condition which is used to check whether query should be quarantined.
Expand Down Expand Up @@ -1110,11 +1113,14 @@ const (

// version211 add column `summary` to `mysql.tidb_background_subtask_history`.
version211 = 211

// version212 modify column `plan_digest`, add column `sql_digest` and unique union key to `mysql.tidb_runaway_queries`.
version212 = 212
)

// currentBootstrapVersion is defined as a variable, so we can modify its value for testing.
// please make sure this is the largest version
var currentBootstrapVersion int64 = version211
var currentBootstrapVersion int64 = version212

// DDL owner key's expired time is ManagerSessionTTL seconds, we should wait the time and give more time to have a chance to finish it.
var internalSQLTimeout = owner.ManagerSessionTTL + 15
Expand Down Expand Up @@ -1281,6 +1287,7 @@ var (
upgradeToVer209,
upgradeToVer210,
upgradeToVer211,
upgradeToVer212,
}
)

Expand Down Expand Up @@ -3080,6 +3087,16 @@ func upgradeToVer211(s sessiontypes.Session, ver int64) {
doReentrantDDL(s, "ALTER TABLE mysql.tidb_background_subtask_history ADD COLUMN `summary` JSON", infoschema.ErrColumnExists)
}

func upgradeToVer212(s sessiontypes.Session, ver int64) {
if ver >= version212 {
return
}
doReentrantDDL(s, "ALTER TABLE mysql.tidb_runaway_queries MODIFY COLUMN `plan_digest` varchar(64) DEFAULT '';", infoschema.ErrColumnExists)
doReentrantDDL(s, "ALTER TABLE mysql.tidb_runaway_queries ADD COLUMN `sql_digest` varchar(64) DEFAULT '' AFTER `original_sql`;", infoschema.ErrColumnExists)
doReentrantDDL(s, "ALTER TABLE mysql.tidb_runaway_queries ADD COLUMN `repeats` int DEFAULT 0 AFTER `time`;", infoschema.ErrColumnExists)
doReentrantDDL(s, "ALTER TABLE mysql.tidb_runaway_queries ADD UNIQUE KEY runaway_task(resource_group_name, sql_digest, plan_digest);", dbterror.ErrDupKeyName)
}

// initGlobalVariableIfNotExists initialize a global variable with specific val if it does not exist.
func initGlobalVariableIfNotExists(s sessiontypes.Session, name string, val any) {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBootstrap)
Expand Down

0 comments on commit f7ca2d1

Please sign in to comment.