Skip to content

Commit

Permalink
pkg/parser: support SWITCH_GROUP syntax for runaway watch (#54804)
Browse files Browse the repository at this point in the history
ref #54434
  • Loading branch information
JmPotato authored Sep 4, 2024
1 parent e77d4a1 commit c9baef4
Show file tree
Hide file tree
Showing 26 changed files with 11,209 additions and 11,041 deletions.
12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -6010,13 +6010,13 @@ def go_deps():
name = "com_github_pingcap_kvproto",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/kvproto",
sha256 = "b1af34db24f2650cd2a687fa9c58bd746eb3ca76e08afe5dbe2ac569ce54b597",
strip_prefix = "github.com/pingcap/kvproto@v0.0.0-20240716095229-5f7ffec83ea7",
sha256 = "368b662c8669d91bcd488b780b8ecb273855b8fc54c1043907171bdebc3ffc54",
strip_prefix = "github.com/pingcap/kvproto@v0.0.0-20240904041139-1de8accd5bb7",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20240716095229-5f7ffec83ea7.zip",
"http://ats.apps.svc/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20240716095229-5f7ffec83ea7.zip",
"https://cache.hawkingrei.com/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20240716095229-5f7ffec83ea7.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20240716095229-5f7ffec83ea7.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20240904041139-1de8accd5bb7.zip",
"http://ats.apps.svc/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20240904041139-1de8accd5bb7.zip",
"https://cache.hawkingrei.com/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20240904041139-1de8accd5bb7.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20240904041139-1de8accd5bb7.zip",
],
)
go_repository(
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/snap_client/systable_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,5 +116,5 @@ func TestCheckSysTableCompatibility(t *testing.T) {
//
// The above variables are in the file br/pkg/restore/systable_restore.go
func TestMonitorTheSystemTableIncremental(t *testing.T) {
require.Equal(t, int64(211), session.CurrentBootstrapVersion)
require.Equal(t, int64(212), session.CurrentBootstrapVersion)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ require (
github.com/pingcap/errors v0.11.5-0.20240318064555-6bd07397691f
github.com/pingcap/failpoint v0.0.0-20240527053858-9b3b6e34194a
github.com/pingcap/fn v1.0.0
github.com/pingcap/kvproto v0.0.0-20240716095229-5f7ffec83ea7
github.com/pingcap/kvproto v0.0.0-20240904041139-1de8accd5bb7
github.com/pingcap/log v1.1.1-0.20240314023424-862ccc32f18d
github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5
github.com/pingcap/tidb/pkg/parser v0.0.0-20211011031125-9b13dc409c5e
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -702,8 +702,8 @@ github.com/pingcap/fn v1.0.0/go.mod h1:u9WZ1ZiOD1RpNhcI42RucFh/lBuzTu6rw88a+oF2Z
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20240716095229-5f7ffec83ea7 h1:V9XS3FQ/P6u+kFaoSyY5DBswIA774BMpIOLDBMrpxKc=
github.com/pingcap/kvproto v0.0.0-20240716095229-5f7ffec83ea7/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/kvproto v0.0.0-20240904041139-1de8accd5bb7 h1:AHDEjW05jX67LWNOCQtqsirfx1XpGOGI24ey4bPdkB8=
github.com/pingcap/kvproto v0.0.0-20240904041139-1de8accd5bb7/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/log v1.1.1-0.20240314023424-862ccc32f18d h1:y3EueKVfVykdpTyfUnQGqft0ud+xVFuCdp1XkVL0X1E=
Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/resource_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ func SetDirectResourceGroupRunawayOption(resourceGroupSettings *model.ResourceGr
settings.ExecElapsedTimeMs = uint64(dur.Milliseconds())
case pmodel.RunawayAction:
settings.Action = opt.ActionOption.Type
settings.SwitchGroupName = opt.ActionOption.SwitchGroupName.String()
case pmodel.RunawayWatch:
settings.WatchType = opt.WatchOption.Type
if dur := opt.WatchOption.Duration; len(dur) > 0 {
Expand Down
2 changes: 2 additions & 0 deletions pkg/ddl/resourcegroup/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,6 @@ var (
ErrInvalidResourceGroupRunawayExecElapsedTime = errors.New("invalid exec elapsed time")
// ErrUnknownResourceGroupRunawayAction is from group.go.
ErrUnknownResourceGroupRunawayAction = errors.New("unknown resource group runaway action")
// ErrUnknownResourceGroupRunawaySwitchGroupName is from group.go.
ErrUnknownResourceGroupRunawaySwitchGroupName = errors.New("unknown resource group runaway switch group name")
)
10 changes: 10 additions & 0 deletions pkg/ddl/resourcegroup/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,29 @@ func NewGroupFromOptions(groupName string, options *model.ResourceGroupSettings)
runaway := &rmpb.RunawaySettings{
Rule: &rmpb.RunawayRule{},
}

// Update the rule settings.
if options.Runaway.ExecElapsedTimeMs == 0 {
return nil, ErrInvalidResourceGroupRunawayExecElapsedTime
}
runaway.Rule.ExecElapsedTimeMs = options.Runaway.ExecElapsedTimeMs
if options.Runaway.Action == pmodel.RunawayActionNone {
return nil, ErrUnknownResourceGroupRunawayAction
}
// Update the action settings.
runaway.Action = rmpb.RunawayAction(options.Runaway.Action)
if options.Runaway.Action == pmodel.RunawayActionSwitchGroup && len(options.Runaway.SwitchGroupName) == 0 {
return nil, ErrUnknownResourceGroupRunawaySwitchGroupName
}
// TODO: validate the switch group name to ensure it exists.
runaway.SwitchGroupName = options.Runaway.SwitchGroupName
// Update the watch settings.
if options.Runaway.WatchType != pmodel.WatchNone {
runaway.Watch = &rmpb.RunawayWatch{}
runaway.Watch.Type = rmpb.RunawayWatchType(options.Runaway.WatchType)
runaway.Watch.LastingDurationMs = options.Runaway.WatchDurationMs
}

group.RunawaySettings = runaway
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,9 +550,13 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
_, planDigest := GetPlanDigest(stmtCtx)
_, sqlDigest := stmtCtx.SQLDigest()
stmtCtx.RunawayChecker = rm.DeriveChecker(stmtCtx.ResourceGroupName, stmtCtx.OriginalSQL, sqlDigest.String(), planDigest.String(), sessionVars.StartTime)
if err := stmtCtx.RunawayChecker.BeforeExecutor(); err != nil {
switchGroupName, err := stmtCtx.RunawayChecker.BeforeExecutor()
if err != nil {
return nil, err
}
if len(switchGroupName) > 0 {
stmtCtx.ResourceGroupName = switchGroupName
}
}
ctx = a.observeStmtBeginForTopSQL(ctx)

Expand Down
11 changes: 8 additions & 3 deletions pkg/executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3647,7 +3647,6 @@ func (e *memtableRetriever) setDataFromRunawayWatches(sctx sessionctx.Context) e
watches := do.RunawayManager().GetWatchList()
rows := make([][]types.Datum, 0, len(watches))
for _, watch := range watches {
action := watch.Action
row := types.MakeDatums(
watch.ID,
watch.ResourceGroupName,
Expand All @@ -3656,7 +3655,7 @@ func (e *memtableRetriever) setDataFromRunawayWatches(sctx sessionctx.Context) e
watch.Watch.String(),
watch.WatchText,
watch.Source,
action.String(),
watch.GetActionString(),
)
if watch.EndTime.Equal(runaway.NullTime) {
row[3].SetString("UNLIMITED", mysql.DefaultCollationName)
Expand Down Expand Up @@ -3698,7 +3697,13 @@ func (e *memtableRetriever) setDataFromResourceGroups() error {
}
dur := time.Duration(setting.Rule.ExecElapsedTimeMs) * time.Millisecond
fmt.Fprintf(limitBuilder, "EXEC_ELAPSED='%s'", dur.String())
fmt.Fprintf(limitBuilder, ", ACTION=%s", pmodel.RunawayActionType(setting.Action).String())
actionType := pmodel.RunawayActionType(setting.Action)
switch actionType {
case pmodel.RunawayActionDryRun, pmodel.RunawayActionCooldown, pmodel.RunawayActionKill:
fmt.Fprintf(limitBuilder, ", ACTION=%s", actionType.String())
case pmodel.RunawayActionSwitchGroup:
fmt.Fprintf(limitBuilder, ", ACTION=%s(%s)", actionType.String(), setting.SwitchGroupName)
}
if setting.Watch != nil {
if setting.Watch.LastingDurationMs > 0 {
dur := time.Duration(setting.Watch.LastingDurationMs) * time.Millisecond
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/infoschema_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ func TestColumnTable(t *testing.T) {
testkit.RowsWithSep("|",
"test|tbl1|col_2"))
tk.MustQuery(`select count(*) from information_schema.columns;`).Check(
testkit.RowsWithSep("|", "4919"))
testkit.RowsWithSep("|", "4921"))
}

func TestIndexUsageTable(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/executor/internal/querywatch/query_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func setWatchOption(ctx context.Context,
}
case ast.QueryWatchAction:
record.Action = rmpb.RunawayAction(op.ActionOption.Type)
record.SwitchGroupName = op.ActionOption.SwitchGroupName.String()
case ast.QueryWatchType:
textOption := op.TextOption
expr, err := plannerutil.RewriteAstExprWithPlanCtx(sctx.GetPlanCtx(), textOption.PatternExpr, nil, nil, false)
Expand Down Expand Up @@ -151,7 +152,9 @@ func validateWatchRecord(record *runaway.QuarantineRecord, client *rmclient.Reso
return errors.Errorf("must set runaway config for resource group `%s`", record.ResourceGroupName)
}
record.Action = rg.RunawaySettings.Action
record.SwitchGroupName = rg.RunawaySettings.SwitchGroupName
}
// TODO: validate the switch group.
if record.Watch == rmpb.RunawayWatchType_NoneWatch {
return errors.Errorf("must specify watch type")
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/meta/model/resource_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
type ResourceGroupRunawaySettings struct {
ExecElapsedTimeMs uint64 `json:"exec_elapsed_time_ms"`
Action model.RunawayActionType `json:"action"`
SwitchGroupName string `json:"switch_group_name"`
WatchType model.RunawayWatchType `json:"watch_type"`
WatchDurationMs int64 `json:"watch_duration_ms"`
}
Expand Down Expand Up @@ -84,7 +85,11 @@ func (p *ResourceGroupSettings) String() string {
}
if p.Runaway != nil {
writeSettingDurationToBuilder(sb, "QUERY_LIMIT=(EXEC_ELAPSED", time.Duration(p.Runaway.ExecElapsedTimeMs)*time.Millisecond, separatorFn)
writeSettingItemToBuilder(sb, "ACTION="+p.Runaway.Action.String())
if p.Runaway.Action == model.RunawayActionSwitchGroup {
writeSettingItemToBuilder(sb, fmt.Sprintf("ACTION=%s(%s)", p.Runaway.Action.String(), p.Runaway.SwitchGroupName))
} else {
writeSettingItemToBuilder(sb, "ACTION="+p.Runaway.Action.String())
}
if p.Runaway.WatchType != model.WatchNone {
writeSettingItemToBuilder(sb, "WATCH="+p.Runaway.WatchType.String())
if p.Runaway.WatchDurationMs > 0 {
Expand Down
17 changes: 15 additions & 2 deletions pkg/parser/ast/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2339,14 +2339,27 @@ func (n *ResourceGroupRunawayRuleOption) restore(ctx *format.RestoreCtx) error {
// ResourceGroupRunawayActionOption is used for parsing the resource group runaway action.
type ResourceGroupRunawayActionOption struct {
node
Type model.RunawayActionType
Type model.RunawayActionType
SwitchGroupName model.CIStr
}

// Restore implements Node interface.
func (n *ResourceGroupRunawayActionOption) Restore(ctx *format.RestoreCtx) error {
ctx.WriteKeyWord("ACTION ")
ctx.WritePlain("= ")
ctx.WriteKeyWord(n.Type.String())
switch n.Type {
case model.RunawayActionNone, model.RunawayActionDryRun, model.RunawayActionCooldown, model.RunawayActionKill:
ctx.WriteKeyWord(n.Type.String())
case model.RunawayActionSwitchGroup:
switchGroup := n.SwitchGroupName.String()
if len(switchGroup) == 0 {
return errors.New("SWITCH_GROUP runaway watch action requires a non-empty group name")
}
ctx.WriteKeyWord("SWITCH_GROUP")
ctx.WritePlain("(")
ctx.WriteName(switchGroup)
ctx.WritePlain(")")
}
return nil
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/parser/ast/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,10 @@ func TestResourceGroupDDLStmtRestore(t *testing.T) {
"CREATE RESOURCE GROUP IF NOT EXISTS rg1 RU_PER_SEC = 500 QUERY_LIMIT=(EXEC_ELAPSED='60s', ACTION=COOLDOWN)",
"CREATE RESOURCE GROUP IF NOT EXISTS `rg1` RU_PER_SEC = 500, QUERY_LIMIT = (EXEC_ELAPSED = '60s' ACTION = COOLDOWN)",
},
{
"CREATE RESOURCE GROUP IF NOT EXISTS rg1 RU_PER_SEC = 500 QUERY_LIMIT=(ACTION=SWITCH_GROUP(rg2))",
"CREATE RESOURCE GROUP IF NOT EXISTS `rg1` RU_PER_SEC = 500, QUERY_LIMIT = (ACTION = SWITCH_GROUP(`rg2`))",
},
}
extractNodeFunc := func(node Node) Node {
return node.(*CreateResourceGroupStmt)
Expand All @@ -963,6 +967,10 @@ func TestResourceGroupDDLStmtRestore(t *testing.T) {
"ALTER RESOURCE GROUP rg1 QUERY_LIMIT=(EXEC_ELAPSED='60s', ACTION=KILL, WATCH=SIMILAR DURATION='10m')",
"ALTER RESOURCE GROUP `rg1` QUERY_LIMIT = (EXEC_ELAPSED = '60s' ACTION = KILL WATCH = SIMILAR DURATION = '10m')",
},
{
"ALTER RESOURCE GROUP rg1 QUERY_LIMIT=(EXEC_ELAPSED='1m', ACTION=SWITCH_GROUP(rg2), WATCH=SIMILAR DURATION='10m')",
"ALTER RESOURCE GROUP `rg1` QUERY_LIMIT = (EXEC_ELAPSED = '1m' ACTION = SWITCH_GROUP(`rg2`) WATCH = SIMILAR DURATION = '10m')",
},
{
"ALTER RESOURCE GROUP rg1 QUERY_LIMIT=NULL",
"ALTER RESOURCE GROUP `rg1` QUERY_LIMIT = NULL",
Expand Down
4 changes: 4 additions & 0 deletions pkg/parser/ast/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,10 @@ func TestAddQueryWatchStmtRestore(t *testing.T) {
"QUERY WATCH ADD RESOURCE GROUP rg1 ACTION COOLDOWN PLAN DIGEST 'd08bc323a934c39dc41948b0a073725be3398479b6fa4f6dd1db2a9b115f7f57'",
"QUERY WATCH ADD RESOURCE GROUP `rg1` ACTION = COOLDOWN PLAN DIGEST _UTF8MB4'd08bc323a934c39dc41948b0a073725be3398479b6fa4f6dd1db2a9b115f7f57'",
},
{
"QUERY WATCH ADD ACTION SWITCH_GROUP(rg1) SQL TEXT EXACT TO 'select * from test.t1'",
"QUERY WATCH ADD ACTION = SWITCH_GROUP(`rg1`) SQL TEXT EXACT TO _UTF8MB4'select * from test.t1'",
},
}
extractNodeFunc := func(node ast.Node) ast.Node {
return node.(*ast.AddQueryWatchStmt)
Expand Down
1 change: 1 addition & 0 deletions pkg/parser/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,7 @@ var tokenMap = map[string]int{
"SURVIVAL_PREFERENCES": survivalPreferences,
"SWAPS": swaps,
"SWITCHES": switchesSym,
"SWITCH_GROUP": switchGroup,
"SYSTEM": system,
"SYSTEM_TIME": systemTime,
"TARGET": target,
Expand Down
3 changes: 3 additions & 0 deletions pkg/parser/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ const (
RunawayActionDryRun
RunawayActionCooldown
RunawayActionKill
RunawayActionSwitchGroup
)

// RunawayWatchType is the type of runaway watch.
Expand Down Expand Up @@ -349,6 +350,8 @@ func (t RunawayActionType) String() string {
return "COOLDOWN"
case RunawayActionKill:
return "KILL"
case RunawayActionSwitchGroup:
return "SWITCH_GROUP"
default:
return "DRYRUN"
}
Expand Down
Loading

0 comments on commit c9baef4

Please sign in to comment.