From e5ef71af6cd6a36a2689710da4f9ca40bc481ad0 Mon Sep 17 00:00:00 2001 From: arenatlx <314806019@qq.com> Date: Wed, 4 Dec 2024 17:48:51 +0800 Subject: [PATCH 1/7] . Signed-off-by: arenatlx <314806019@qq.com> --- pkg/planner/cascades/base/BUILD.bazel | 3 + .../cascades/base/cascadesctx/BUILD.bazel | 12 ++ .../cascades/base/cascadesctx/cascades_ctx.go | 31 ++++ .../cascades/base/task_scheduler_base.go | 25 ++++ pkg/planner/cascades/base/task_stack_base.go | 36 +++++ pkg/planner/cascades/memo/BUILD.bazel | 1 + pkg/planner/cascades/memo/group.go | 20 +++ pkg/planner/cascades/memo/group_expr.go | 32 +++++ pkg/planner/cascades/task/BUILD.bazel | 17 ++- pkg/planner/cascades/task/base.go | 30 ++++ pkg/planner/cascades/task/task.go | 61 +++----- pkg/planner/cascades/task/task_apply_rule.go | 135 ++++++++++++++++++ pkg/planner/cascades/task/task_opt_group.go | 56 ++++++++ .../task/task_opt_group_expression.go | 67 +++++++++ pkg/planner/cascades/task/task_scheduler.go | 54 +++---- .../cascades/task/task_scheduler_test.go | 34 ++--- pkg/planner/cascades/task/task_test.go | 66 ++++++--- 17 files changed, 573 insertions(+), 107 deletions(-) create mode 100644 pkg/planner/cascades/base/cascadesctx/BUILD.bazel create mode 100644 pkg/planner/cascades/base/cascadesctx/cascades_ctx.go create mode 100644 pkg/planner/cascades/base/task_scheduler_base.go create mode 100644 pkg/planner/cascades/base/task_stack_base.go create mode 100644 pkg/planner/cascades/task/base.go create mode 100644 pkg/planner/cascades/task/task_apply_rule.go create mode 100644 pkg/planner/cascades/task/task_opt_group.go create mode 100644 pkg/planner/cascades/task/task_opt_group_expression.go diff --git a/pkg/planner/cascades/base/BUILD.bazel b/pkg/planner/cascades/base/BUILD.bazel index b97aa511e1665..e4bb07239c72a 100644 --- a/pkg/planner/cascades/base/BUILD.bazel +++ b/pkg/planner/cascades/base/BUILD.bazel @@ -5,9 +5,12 @@ go_library( srcs = [ "base.go", "hash_equaler.go", + "task_scheduler_base.go", + "task_stack_base.go", ], importpath = "github.com/pingcap/tidb/pkg/planner/cascades/base", visibility = ["//visibility:public"], + deps = ["//pkg/planner/cascades/util"], ) go_test( diff --git a/pkg/planner/cascades/base/cascadesctx/BUILD.bazel b/pkg/planner/cascades/base/cascadesctx/BUILD.bazel new file mode 100644 index 0000000000000..8bb70a0524726 --- /dev/null +++ b/pkg/planner/cascades/base/cascadesctx/BUILD.bazel @@ -0,0 +1,12 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "cascadesctx", + srcs = ["cascades_ctx.go"], + importpath = "github.com/pingcap/tidb/pkg/planner/cascades/base/cascadesctx", + visibility = ["//visibility:public"], + deps = [ + "//pkg/planner/cascades/base", + "//pkg/planner/cascades/memo", + ], +) diff --git a/pkg/planner/cascades/base/cascadesctx/cascades_ctx.go b/pkg/planner/cascades/base/cascadesctx/cascades_ctx.go new file mode 100644 index 0000000000000..918f125c8dc95 --- /dev/null +++ b/pkg/planner/cascades/base/cascadesctx/cascades_ctx.go @@ -0,0 +1,31 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cascadesctx + +import ( + "github.com/pingcap/tidb/pkg/planner/cascades/base" + "github.com/pingcap/tidb/pkg/planner/cascades/memo" +) + +// CascadesContext define the yams context as interface, since it will be defined +// in cascades pkg, which ref task pkg with no doubt. +// while in the task pkg, the concrete task need receive yams context as its +// constructing args, which will lead an import cycle. +type CascadesContext interface { + Destroy() + GetScheduler() base.Scheduler + PushTask(task base.Task) + GetMemo() *memo.Memo +} diff --git a/pkg/planner/cascades/base/task_scheduler_base.go b/pkg/planner/cascades/base/task_scheduler_base.go new file mode 100644 index 0000000000000..067f3d6748dd3 --- /dev/null +++ b/pkg/planner/cascades/base/task_scheduler_base.go @@ -0,0 +1,25 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package base + +// Scheduler is a scheduling interface defined for serializing(single thread)/concurrent(multi thread) running. +type Scheduler interface { + // ExecuteTasks start the internal scheduling. + ExecuteTasks() error + // Destroy release the internal resource if any. + Destroy() + // PushTask is outside portal for inserting a new task in. task running can also trigger another successive task. + PushTask(task Task) +} diff --git a/pkg/planner/cascades/base/task_stack_base.go b/pkg/planner/cascades/base/task_stack_base.go new file mode 100644 index 0000000000000..1ddb400c7afab --- /dev/null +++ b/pkg/planner/cascades/base/task_stack_base.go @@ -0,0 +1,36 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package base + +import ( + "github.com/pingcap/tidb/pkg/planner/cascades/util" +) + +// Stack is abstract definition of task container.(TaskStack is a kind of array stack implementation of it) +type Stack interface { + Push(one Task) + Pop() Task + Empty() bool + Destroy() +} + +// Task is an interface defined for all type of optimizing work: exploring, implementing, +// deriving-stats, join-reordering and so on. +type Task interface { + // Execute task self executing logic + Execute() error + // Desc task self description string. + Desc(w util.StrBufferWriter) +} diff --git a/pkg/planner/cascades/memo/BUILD.bazel b/pkg/planner/cascades/memo/BUILD.bazel index 900fa9a0c386a..8fae94b19bb29 100644 --- a/pkg/planner/cascades/memo/BUILD.bazel +++ b/pkg/planner/cascades/memo/BUILD.bazel @@ -18,6 +18,7 @@ go_library( "//pkg/planner/core/base", "//pkg/planner/property", "//pkg/util/intest", + "@com_github_bits_and_blooms_bitset//:bitset", "@com_github_pingcap_failpoint//:failpoint", ], ) diff --git a/pkg/planner/cascades/memo/group.go b/pkg/planner/cascades/memo/group.go index c9a7f4a3286d1..8d92e7ba02d5b 100644 --- a/pkg/planner/cascades/memo/group.go +++ b/pkg/planner/cascades/memo/group.go @@ -180,3 +180,23 @@ func NewGroup(prop *property.LogicalProperty) *Group { } return g } + +// GetLogicalProperty return this group's logical property. +func (g *Group) GetLogicalProperty() *property.LogicalProperty { + return g.logicalProp +} + +// SetLogicalProperty set this group's logical property. +func (g *Group) SetLogicalProperty(prop *property.LogicalProperty) { + g.logicalProp = prop +} + +// IsExplored returns whether this group is explored. +func (g *Group) IsExplored() bool { + return g.explored +} + +// SetExplored set the group as tagged as explored. +func (g *Group) SetExplored() { + g.explored = true +} diff --git a/pkg/planner/cascades/memo/group_expr.go b/pkg/planner/cascades/memo/group_expr.go index 5135bdc4c53c1..c10cb2753c4c0 100644 --- a/pkg/planner/cascades/memo/group_expr.go +++ b/pkg/planner/cascades/memo/group_expr.go @@ -15,6 +15,7 @@ package memo import ( + "github.com/bits-and-blooms/bitset" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/expression" base2 "github.com/pingcap/tidb/pkg/planner/cascades/base" @@ -42,6 +43,15 @@ type GroupExpression struct { // hash64 is the unique fingerprint of the GroupExpression. hash64 uint64 + + // mask indicate what rules have been applied in this group expression. + mask *bitset.BitSet + + // abandoned is used in a case, when this gE has been encapsulated (say) 3 tasks + // and pushed into the task, this 3 task are all referring to this same gE, one + // of them has been substituted halfway, the successive task waiting on the task + // should feel this gE is out of date, and this task is abandoned. + abandoned bool } // GetGroup returns the Group that this GroupExpression belongs to. @@ -117,6 +127,8 @@ func NewGroupExpression(lp base.LogicalPlan, inputs []*Group) *GroupExpression { Inputs: inputs, LogicalPlan: lp, hash64: 0, + // todo: add rule set length + mask: bitset.New(1), } } @@ -126,6 +138,26 @@ func (e *GroupExpression) Init(h base2.Hasher) { e.hash64 = h.Sum64() } +// IsExplored return whether this gE has explored rule i. +func (e *GroupExpression) IsExplored(i uint) bool { + return e.mask.Test(i) +} + +// SetExplored set this gE as explored in rule i. +func (e *GroupExpression) SetExplored(i uint) { + e.mask.Set(i) +} + +// IsAbandoned returns whether this gE is abandoned. +func (e *GroupExpression) IsAbandoned() bool { + return e.abandoned +} + +// SetAbandoned set this gE as abandoned. +func (e *GroupExpression) SetAbandoned() { + e.abandoned = true +} + // DeriveLogicalProp derive the new group's logical property from a specific GE. // DeriveLogicalProp is not called with recursive, because we only examine and // init new group from bottom-up, so we can sure that this new group's children diff --git a/pkg/planner/cascades/task/BUILD.bazel b/pkg/planner/cascades/task/BUILD.bazel index 869dd5919a5b4..477880716fa3c 100644 --- a/pkg/planner/cascades/task/BUILD.bazel +++ b/pkg/planner/cascades/task/BUILD.bazel @@ -3,11 +3,22 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "task", srcs = [ + "base.go", "task.go", + "task_apply_rule.go", + "task_opt_group.go", + "task_opt_group_expression.go", "task_scheduler.go", ], importpath = "github.com/pingcap/tidb/pkg/planner/cascades/task", visibility = ["//visibility:public"], + deps = [ + "//pkg/planner/cascades/base", + "//pkg/planner/cascades/base/cascadesctx", + "//pkg/planner/cascades/memo", + "//pkg/planner/cascades/rule", + "//pkg/planner/cascades/util", + ], ) go_test( @@ -20,5 +31,9 @@ go_test( embed = [":task"], flaky = True, shard_count = 3, - deps = ["@com_github_stretchr_testify//require"], + deps = [ + "//pkg/planner/cascades/base", + "//pkg/planner/cascades/util", + "@com_github_stretchr_testify//require", + ], ) diff --git a/pkg/planner/cascades/task/base.go b/pkg/planner/cascades/task/base.go new file mode 100644 index 0000000000000..ed6cb615ee2cf --- /dev/null +++ b/pkg/planner/cascades/task/base.go @@ -0,0 +1,30 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package task + +import ( + "github.com/pingcap/tidb/pkg/planner/cascades/base" + "github.com/pingcap/tidb/pkg/planner/cascades/base/cascadesctx" +) + +// BaseTask is base task wrapper structure for encapsulating basic things. +type BaseTask struct { + ctx cascadesctx.CascadesContext +} + +// Push pushes a new task into inside stack. +func (b *BaseTask) Push(t base.Task) { + b.ctx.PushTask(t) +} diff --git a/pkg/planner/cascades/task/task.go b/pkg/planner/cascades/task/task.go index 72e4fcd0b9ad8..00e9b00887d7f 100644 --- a/pkg/planner/cascades/task/task.go +++ b/pkg/planner/cascades/task/task.go @@ -15,70 +15,53 @@ package task import ( - "strings" "sync" -) - -// Task is an interface defined for all type of optimizing work: exploring, implementing, -// deriving-stats, join-reordering and so on. -type Task interface { - // task self executing logic - execute() error - // task self description string. - desc() string -} -// Stack is abstract definition of task container.(TaskStack is a kind of array stack implementation of it) -type Stack interface { - Push(one Task) - Pop() Task - Empty() bool - Destroy() -} + "github.com/pingcap/tidb/pkg/planner/cascades/base" + "github.com/pingcap/tidb/pkg/planner/cascades/util" +) -// StackTaskPool is initialized for memory saving by reusing taskStack. -var StackTaskPool = sync.Pool{ +// stackPool is initialized for memory saving by reusing taskStack. +var stackPool = sync.Pool{ New: func() any { return newTaskStack() }, } // TaskStack is used to store the optimizing tasks created before or during the optimizing process. -type taskStack struct { - tasks []Task +type TaskStack struct { + tasks []base.Task } -func newTaskStack() *taskStack { - return &taskStack{ - tasks: make([]Task, 0, 4), +func newTaskStack() *TaskStack { + return &TaskStack{ + tasks: make([]base.Task, 0, 4), } } // Destroy indicates that when stack itself is useless like in the end of optimizing phase, we can destroy ourselves. -func (ts *taskStack) Destroy() { +func (ts *TaskStack) Destroy() { // when a taskStack itself is useless, we can destroy itself actively. clear(ts.tasks) - StackTaskPool.Put(ts) + stackPool.Put(ts) } // Desc is used to desc the detail info about current stack state. // when use customized stack to drive the tasks, the call-chain state is dived in the stack. -func (ts *taskStack) Desc() string { - var str strings.Builder +func (ts *TaskStack) Desc(w util.StrBufferWriter) { for _, one := range ts.tasks { - str.WriteString(one.desc()) - str.WriteString("\n") + one.Desc(w) + w.WriteString("\n") } - return str.String() } // Len indicates the length of current stack. -func (ts *taskStack) Len() int { +func (ts *TaskStack) Len() int { return len(ts.tasks) } // Pop indicates to pop one task out of the stack. -func (ts *taskStack) Pop() Task { +func (ts *TaskStack) Pop() base.Task { if !ts.Empty() { tmp := ts.tasks[len(ts.tasks)-1] ts.tasks = ts.tasks[:len(ts.tasks)-1] @@ -88,18 +71,18 @@ func (ts *taskStack) Pop() Task { } // Push indicates to push one task into the stack. -func (ts *taskStack) Push(one Task) { +func (ts *TaskStack) Push(one base.Task) { ts.tasks = append(ts.tasks, one) } // Empty indicates whether taskStack is empty. -func (ts *taskStack) Empty() bool { +func (ts *TaskStack) Empty() bool { return ts.Len() == 0 } // BenchTest required. -func newTaskStackWithCap(c int) *taskStack { - return &taskStack{ - tasks: make([]Task, 0, c), +func newTaskStackWithCap(c int) *TaskStack { + return &TaskStack{ + tasks: make([]base.Task, 0, c), } } diff --git a/pkg/planner/cascades/task/task_apply_rule.go b/pkg/planner/cascades/task/task_apply_rule.go new file mode 100644 index 0000000000000..6a2e0893777ca --- /dev/null +++ b/pkg/planner/cascades/task/task_apply_rule.go @@ -0,0 +1,135 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package task + +import ( + "github.com/pingcap/tidb/pkg/planner/cascades/base" + "github.com/pingcap/tidb/pkg/planner/cascades/base/cascadesctx" + "github.com/pingcap/tidb/pkg/planner/cascades/memo" + "github.com/pingcap/tidb/pkg/planner/cascades/rule" + "github.com/pingcap/tidb/pkg/planner/cascades/util" +) + +var _ base.Task = &ApplyRuleTask{} + +// Document: +// Currently we introduce stack-based task scheduler for running the memo optimizing. This way is +// lightweight for call deeper chain, especially when the tree is beyond the recursive limit. Besides, +// all the optimizing logic is encapsulated as Task unit, which is running transparent and resource +// isolated internally. +// +// First, we are optimizing the root node from the memo tree downward, so we got the only one task as +// OptGroupTask{root}, inside which, the consecutive downward Tasks will be triggered and encapsulated +// and pushed into the singleton stack continuously. Different task type may trigger additional task +// generation depend on how the Execute interface is implemented. +// +// Currently, here is how we work. +// +// Singleton Task Stack +// ┌────┬────┬────┐ +// │ │ │ │ +// ┌───┼────┼────┼────┼──────────────────────────────────────── +// │ ┌─┼─┐┌─▼─┐┌─▼─┐┌─▼─┐┌───┐┌───┐┌───┐ +// │ │ A ││ B ││ B ││ B ││ C ││ C ││ A │ Open Task Stack... +// │ └───┘└───┘└───┘└─┼─┘└─▲─┘└─▲─┘└─▲─┘ +// └──────────────────┼────┼────┼────┼───────────────────────── +// │ │ │ │ +// └────┴────┴────┘ +// Symbol means: +// A represent OptGroupTask +// B represent OptGroupExpressionTask +// C represent ApplyRuleTask +// +// When memo init is done, the only targeted task is OptGroupTask, say we got +// 3 group expression inside this group, it will trigger and push additional +// 3 OptGroupExpressionTask into the stack. Then task A is wiped out from the +// stack. With the FILO rule, the stack-top B will be popped out and run, from +// which it will find valid rules for its member group expression and encapsulate +// ApplyRuleTask for each of those valid rules. Say we got two valid rules here, +// so it will push another two task with type C into the stack, note, since current +// B's child group hasn't been optimized yet, so the cascades task A will be triggered +// and pushed into the stack as well, and they are queued after rule tasks. then +// the toppest B is wiped out from the stack. +// +// At last, when the stack is out of task calling internally, or forcible mechanism +// is called from the outside, this stack running will be stopped. +// +// State Flow: +// ┌── Opt 4 New Group Expression ──┐ +// │ │ +// ┌────────────────┐ ┌────────────▼───────────┐ ┌───────┴───────┐ +// │ optGroupTask │ ───────► │ optGroupExpressionTask │ ───────► │ ApplyRuleTask │ +// └──────▲─────────┘ └────────────┬───────────┘ └───────────────┘ +// │ │ +// └───── Child Opt Group Trigger ─────┘ +// + +// ApplyRuleTask is basic logic union of scheduling apply rule. +type ApplyRuleTask struct { + BaseTask + + gE *memo.GroupExpression + rule rule.Rule + // currently we are all explore type tasks. +} + +// NewApplyRuleTask return a new apply rule task. +func NewApplyRuleTask(ctx cascadesctx.CascadesContext, gE *memo.GroupExpression, r rule.Rule) *ApplyRuleTask { + return &ApplyRuleTask{ + BaseTask: BaseTask{ + ctx: ctx, + }, + gE: gE, + rule: r, + } +} + +// Execute implements the task.Execute interface. +func (a *ApplyRuleTask) Execute() error { + // check whether this rule has been applied in this gE or this gE is abandoned. + if a.gE.IsExplored(a.rule.ID()) || a.gE.IsAbandoned() { + return nil + } + pa := a.rule.Pattern() + binder := rule.NewBinder(pa, a.gE) + for holder := binder.Next(); holder != nil; { + if !a.rule.PreCheck(holder) { + continue + } + newExprs, err := a.rule.XForm(holder) + if err != nil { + return err + } + for _, ne := range newExprs { + newGroupExpr, err := a.ctx.GetMemo().CopyIn(a.gE.GetGroup(), ne) + if err != nil { + return err + } + // YAMS only care about logical plan now. + a.Push(NewOptGroupExpressionTask(a.ctx, newGroupExpr)) + } + } + a.gE.SetExplored(a.rule.ID()) + return nil +} + +// Desc implements the task.Desc interface. +func (a *ApplyRuleTask) Desc(w util.StrBufferWriter) { + w.WriteString("ApplyRuleTask{gE:") + a.gE.String(w) + w.WriteString(", rule:") + a.rule.String(w) + w.WriteString("}") +} diff --git a/pkg/planner/cascades/task/task_opt_group.go b/pkg/planner/cascades/task/task_opt_group.go new file mode 100644 index 0000000000000..63b1b0439d771 --- /dev/null +++ b/pkg/planner/cascades/task/task_opt_group.go @@ -0,0 +1,56 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package task + +import ( + "github.com/pingcap/tidb/pkg/planner/cascades/base" + "github.com/pingcap/tidb/pkg/planner/cascades/base/cascadesctx" + "github.com/pingcap/tidb/pkg/planner/cascades/memo" + "github.com/pingcap/tidb/pkg/planner/cascades/util" +) + +var _ base.Task = &OptGroupTask{} + +type OptGroupTask struct { + BaseTask + + group *memo.Group +} + +// NewOptGroupTask returns a new optimizing group task. +func NewOptGroupTask(ctx cascadesctx.CascadesContext, g *memo.Group) base.Task { + return &OptGroupTask{BaseTask: BaseTask{ + ctx: ctx, + }, group: g} +} + +// Execute implements the task.Execute interface. +func (g *OptGroupTask) Execute() error { + if g.group.IsExplored() { + return nil + } + g.group.ForEachGE(func(ge *memo.GroupExpression) { + g.Push(NewOptGroupExpressionTask(g.ctx, ge)) + }) + g.group.SetExplored() + return nil +} + +// Desc implements the task.Desc interface. +func (g *OptGroupTask) Desc(w util.StrBufferWriter) { + w.WriteString("OptGroupTask{group:") + g.group.String(w) + w.WriteString("}") +} diff --git a/pkg/planner/cascades/task/task_opt_group_expression.go b/pkg/planner/cascades/task/task_opt_group_expression.go new file mode 100644 index 0000000000000..9340497eadf4f --- /dev/null +++ b/pkg/planner/cascades/task/task_opt_group_expression.go @@ -0,0 +1,67 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package task + +import ( + "github.com/pingcap/tidb/pkg/planner/cascades/base" + "github.com/pingcap/tidb/pkg/planner/cascades/base/cascadesctx" + "github.com/pingcap/tidb/pkg/planner/cascades/memo" + "github.com/pingcap/tidb/pkg/planner/cascades/rule" + "github.com/pingcap/tidb/pkg/planner/cascades/util" +) + +var _ base.Task = &OptGroupTask{} + +type OptGroupExpressionTask struct { + BaseTask + + groupExpression *memo.GroupExpression + // currently for each opt expression, it should be explore-type. +} + +// NewOptGroupExpressionTask return a targeting optimizing group expression task. +func NewOptGroupExpressionTask(ctx cascadesctx.CascadesContext, ge *memo.GroupExpression) *OptGroupExpressionTask { + return &OptGroupExpressionTask{ + BaseTask: BaseTask{ctx: ctx}, + groupExpression: ge, + } +} + +// Execute implements the task.Execute interface. +func (ge *OptGroupExpressionTask) Execute() error { + ruleList := ge.getValidRules() + for _, one := range ruleList { + ge.Push(NewApplyRuleTask(ge.ctx, ge.groupExpression, one)) + } + // since it's a stack-order, LUFO, when we want to apply a rule for a specific group expression, + // the pre-condition is that this group expression's child group has been fully explored. + for i := len(ge.groupExpression.Inputs) - 1; i >= 0; i-- { + ge.Push(NewOptGroupTask(ge.ctx, ge.groupExpression.Inputs[i])) + } + return nil +} + +// Desc implements the task.Desc interface. +func (ge *OptGroupExpressionTask) Desc(w util.StrBufferWriter) { + w.WriteString("OptGroupExpressionTask{ge:") + ge.groupExpression.String(w) + w.WriteString("}") +} + +// getValidRules filter the allowed rule from session variable, and system config. +func (ge *OptGroupExpressionTask) getValidRules() []rule.Rule { + // todo: add rule set + return []rule.Rule{} +} diff --git a/pkg/planner/cascades/task/task_scheduler.go b/pkg/planner/cascades/task/task_scheduler.go index 5b8da70de5436..112b7c96aed73 100644 --- a/pkg/planner/cascades/task/task_scheduler.go +++ b/pkg/planner/cascades/task/task_scheduler.go @@ -14,42 +14,44 @@ package task -var _ Scheduler = &SimpleTaskScheduler{} +import "github.com/pingcap/tidb/pkg/planner/cascades/base" -// Scheduler is a scheduling interface defined for serializing(single thread)/concurrent(multi thread) running. -type Scheduler interface { - ExecuteTasks() -} +var _ base.Scheduler = &SimpleTaskScheduler{} // SimpleTaskScheduler is defined for serializing scheduling of memo tasks. type SimpleTaskScheduler struct { - Err error - SchedulerCtx SchedulerContext + stack base.Stack } // ExecuteTasks implements the interface of TaskScheduler. -func (s *SimpleTaskScheduler) ExecuteTasks() { - stack := s.SchedulerCtx.getStack() - defer func() { - // when step out of the scheduler, if the stack is empty, clean and release it. - if !stack.Empty() { - stack.Destroy() - } - }() - for !stack.Empty() { +func (s *SimpleTaskScheduler) ExecuteTasks() error { + for !s.stack.Empty() { // when use customized stack to drive the tasks, the call-chain state is dived in the stack. - task := stack.Pop() - if err := task.execute(); err != nil { - s.Err = err - return + task := s.stack.Pop() + if err := task.Execute(); err != nil { + return err } } + return nil +} + +// Destroy release all the allocated elements inside stack. +func (s *SimpleTaskScheduler) Destroy() { + // when step out of the scheduler, if the stack is empty, clean and release it. + stack := s.stack + // release parent pointer ref. + s.stack = nil + stack.Destroy() } -// SchedulerContext is defined for scheduling logic calling, also facilitate interface-oriented coding and testing. -type SchedulerContext interface { - // we exported the Stack interface here rather than the basic stack implementation. - getStack() Stack - // we exported the only one push action to user, Task is an interface definition. - pushTask(task Task) +// PushTask implements the scheduler's interface, add another task into scheduler. +func (s *SimpleTaskScheduler) PushTask(task base.Task) { + s.stack.Push(task) +} + +// NewSimpleTaskScheduler return a simple task scheduler, init logic included. +func NewSimpleTaskScheduler() base.Scheduler { + return &SimpleTaskScheduler{ + stack: stackPool.Get().(base.Stack), + } } diff --git a/pkg/planner/cascades/task/task_scheduler_test.go b/pkg/planner/cascades/task/task_scheduler_test.go index 948b45c82ef8a..cbdbfa0f4ce6f 100644 --- a/pkg/planner/cascades/task/task_scheduler_test.go +++ b/pkg/planner/cascades/task/task_scheduler_test.go @@ -19,19 +19,21 @@ import ( "strconv" "testing" + "github.com/pingcap/tidb/pkg/planner/cascades/base" + "github.com/pingcap/tidb/pkg/planner/cascades/util" "github.com/stretchr/testify/require" ) // TestSchedulerContext is defined to test scheduling logic here. type TestSchedulerContext struct { - ts *taskStack + ts *TaskStack } -func (t *TestSchedulerContext) getStack() Stack { +func (t *TestSchedulerContext) getStack() base.Stack { return t.ts } -func (t *TestSchedulerContext) pushTask(task Task) { +func (t *TestSchedulerContext) pushTask(task base.Task) { t.ts.Push(task) } @@ -40,7 +42,7 @@ type TestTaskImpl2 struct { a int64 } -func (t *TestTaskImpl2) execute() error { +func (t *TestTaskImpl2) Execute() error { // mock error at special task if t.a == 2 { return errors.New("mock error at task id = 2") @@ -48,23 +50,17 @@ func (t *TestTaskImpl2) execute() error { return nil } -func (t *TestTaskImpl2) desc() string { - return strconv.Itoa(int(t.a)) +func (t *TestTaskImpl2) Desc(w util.StrBufferWriter) { + w.WriteString(strconv.Itoa(int(t.a))) } func TestSimpleTaskScheduler(t *testing.T) { - testSchedulerContext := &TestSchedulerContext{ - newTaskStack(), - } - testScheduler := &SimpleTaskScheduler{ - SchedulerCtx: testSchedulerContext, - } - testScheduler.SchedulerCtx.pushTask(&TestTaskImpl2{a: 1}) - testScheduler.SchedulerCtx.pushTask(&TestTaskImpl2{a: 2}) - testScheduler.SchedulerCtx.pushTask(&TestTaskImpl2{a: 3}) + testScheduler := NewSimpleTaskScheduler() + testScheduler.PushTask(&TestTaskImpl2{a: 1}) + testScheduler.PushTask(&TestTaskImpl2{a: 2}) + testScheduler.PushTask(&TestTaskImpl2{a: 3}) - var testTaskScheduler Scheduler = testScheduler - testTaskScheduler.ExecuteTasks() - require.NotNil(t, testScheduler.Err) - require.Equal(t, testScheduler.Err.Error(), "mock error at task id = 2") + err := testScheduler.ExecuteTasks() + require.NotNil(t, err) + require.Equal(t, err.Error(), "mock error at task id = 2") } diff --git a/pkg/planner/cascades/task/task_test.go b/pkg/planner/cascades/task/task_test.go index 2d9673bf19d34..60df6ade76c24 100644 --- a/pkg/planner/cascades/task/task_test.go +++ b/pkg/planner/cascades/task/task_test.go @@ -15,10 +15,13 @@ package task import ( + "bytes" "strconv" "testing" "unsafe" + "github.com/pingcap/tidb/pkg/planner/cascades/base" + "github.com/pingcap/tidb/pkg/planner/cascades/util" "github.com/stretchr/testify/require" ) @@ -26,11 +29,11 @@ type TestTaskImpl struct { a int64 } -func (t *TestTaskImpl) execute() error { +func (t *TestTaskImpl) Execute() error { return nil } -func (t *TestTaskImpl) desc() string { - return strconv.Itoa(int(t.a)) +func (t *TestTaskImpl) Desc(w util.StrBufferWriter) { + w.WriteString(strconv.Itoa(int(t.a))) } func TestTaskStack(t *testing.T) { @@ -50,16 +53,23 @@ func TestTaskStack(t *testing.T) { } func TestTaskFunctionality(t *testing.T) { - taskTaskPool := StackTaskPool.Get() - require.Equal(t, len(taskTaskPool.(*taskStack).tasks), 0) - require.Equal(t, cap(taskTaskPool.(*taskStack).tasks), 4) - ts := taskTaskPool.(*taskStack) + taskTaskPool := stackPool.Get() + require.Equal(t, len(taskTaskPool.(*TaskStack).tasks), 0) + require.Equal(t, cap(taskTaskPool.(*TaskStack).tasks), 4) + ts := taskTaskPool.(*TaskStack) ts.Push(&TestTaskImpl{a: 1}) ts.Push(&TestTaskImpl{a: 2}) one := ts.Pop() - require.Equal(t, one.desc(), "2") + buf := &bytes.Buffer{} + w := util.NewStrBuffer(buf) + one.Desc(w) + w.Flush() + require.Equal(t, buf.String(), "2") one = ts.Pop() - require.Equal(t, one.desc(), "1") + buf.Reset() + one.Desc(w) + w.Flush() + require.Equal(t, buf.String(), "1") // empty, pop nil. one = ts.Pop() require.Nil(t, one) @@ -69,45 +79,57 @@ func TestTaskFunctionality(t *testing.T) { ts.Push(&TestTaskImpl{a: 5}) ts.Push(&TestTaskImpl{a: 6}) // no clean, put it back - StackTaskPool.Put(taskTaskPool) + stackPool.Put(taskTaskPool) // require again. - ts = StackTaskPool.Get().(*taskStack) + ts = stackPool.Get().(*TaskStack) require.Equal(t, len(ts.tasks), 4) require.Equal(t, cap(ts.tasks), 4) // clean the stack one = ts.Pop() - require.Equal(t, one.desc(), "6") + buf.Reset() + one.Desc(w) + w.Flush() + require.Equal(t, buf.String(), "6") one = ts.Pop() - require.Equal(t, one.desc(), "5") + buf.Reset() + one.Desc(w) + w.Flush() + require.Equal(t, buf.String(), "5") one = ts.Pop() - require.Equal(t, one.desc(), "4") + buf.Reset() + one.Desc(w) + w.Flush() + require.Equal(t, buf.String(), "4") one = ts.Pop() - require.Equal(t, one.desc(), "3") + buf.Reset() + one.Desc(w) + w.Flush() + require.Equal(t, buf.String(), "3") one = ts.Pop() require.Nil(t, one) // self destroy. ts.Destroy() - ts = StackTaskPool.Get().(*taskStack) + ts = stackPool.Get().(*TaskStack) require.Equal(t, len(ts.tasks), 0) require.Equal(t, cap(ts.tasks), 4) } // TaskStack2 is used to store the optimizing tasks created before or during the optimizing process. type taskStackForBench struct { - tasks []*Task + tasks []base.Task } func newTaskStackForBenchWithCap(c int) *taskStackForBench { return &taskStackForBench{ - tasks: make([]*Task, 0, c), + tasks: make([]base.Task, 0, c), } } // Push indicates to push one task into the stack. -func (ts *taskStackForBench) Push(one Task) { - ts.tasks = append(ts.tasks, &one) +func (ts *taskStackForBench) Push(one base.Task) { + ts.tasks = append(ts.tasks, one) } // Len indicates the length of current stack. @@ -121,11 +143,11 @@ func (ts *taskStackForBench) Empty() bool { } // Pop indicates to pop one task out of the stack. -func (ts *taskStackForBench) Pop() Task { +func (ts *taskStackForBench) Pop() base.Task { if !ts.Empty() { tmp := ts.tasks[len(ts.tasks)-1] ts.tasks = ts.tasks[:len(ts.tasks)-1] - return *tmp + return tmp } return nil } From 4e26641661b9c2be8d288e714941ec1611e14d4e Mon Sep 17 00:00:00 2001 From: arenatlx <314806019@qq.com> Date: Fri, 6 Dec 2024 15:09:08 +0800 Subject: [PATCH 2/7] . Signed-off-by: arenatlx <314806019@qq.com> --- .../cascades/base/cascadesctx/cascades_ctx.go | 5 +-- pkg/planner/cascades/task/task_apply_rule.go | 32 +++++++++---------- pkg/planner/cascades/task/task_opt_group.go | 1 + .../task/task_opt_group_expression.go | 3 +- 4 files changed, 21 insertions(+), 20 deletions(-) diff --git a/pkg/planner/cascades/base/cascadesctx/cascades_ctx.go b/pkg/planner/cascades/base/cascadesctx/cascades_ctx.go index 918f125c8dc95..e9ecc133d88cf 100644 --- a/pkg/planner/cascades/base/cascadesctx/cascades_ctx.go +++ b/pkg/planner/cascades/base/cascadesctx/cascades_ctx.go @@ -19,10 +19,11 @@ import ( "github.com/pingcap/tidb/pkg/planner/cascades/memo" ) -// CascadesContext define the yams context as interface, since it will be defined +// CascadesContext define the cascades context as interface, since it will be defined // in cascades pkg, which ref task pkg with no doubt. -// while in the task pkg, the concrete task need receive yams context as its +// while in the task pkg, the concrete task need receive cascades context as its // constructing args, which will lead an import cycle. +// so that's why we separate it out of base pkg. type CascadesContext interface { Destroy() GetScheduler() base.Scheduler diff --git a/pkg/planner/cascades/task/task_apply_rule.go b/pkg/planner/cascades/task/task_apply_rule.go index 6a2e0893777ca..ca9b253f5cd2d 100644 --- a/pkg/planner/cascades/task/task_apply_rule.go +++ b/pkg/planner/cascades/task/task_apply_rule.go @@ -30,10 +30,10 @@ var _ base.Task = &ApplyRuleTask{} // all the optimizing logic is encapsulated as Task unit, which is running transparent and resource // isolated internally. // -// First, we are optimizing the root node from the memo tree downward, so we got the only one task as -// OptGroupTask{root}, inside which, the consecutive downward Tasks will be triggered and encapsulated -// and pushed into the singleton stack continuously. Different task type may trigger additional task -// generation depend on how the Execute interface is implemented. +// First, we are optimizing the root node from the memo tree downward, at the beginning we got the only +// one task as OptGroupTask{root}, inside which, the consecutive downward Tasks will be triggered and +// encapsulated and pushed into the singleton stack continuously. Different task type may trigger an +// additional task generation depend on how the Execute interface is implemented. // // Currently, here is how we work. // @@ -52,19 +52,17 @@ var _ base.Task = &ApplyRuleTask{} // B represent OptGroupExpressionTask // C represent ApplyRuleTask // -// When memo init is done, the only targeted task is OptGroupTask, say we got -// 3 group expression inside this group, it will trigger and push additional -// 3 OptGroupExpressionTask into the stack. Then task A is wiped out from the -// stack. With the FILO rule, the stack-top B will be popped out and run, from -// which it will find valid rules for its member group expression and encapsulate -// ApplyRuleTask for each of those valid rules. Say we got two valid rules here, -// so it will push another two task with type C into the stack, note, since current -// B's child group hasn't been optimized yet, so the cascades task A will be triggered -// and pushed into the stack as well, and they are queued after rule tasks. then -// the toppest B is wiped out from the stack. +// When memo init is done, the only targeted task is OptGroupTask, say we got 3 group expression inside +// this group, it will trigger and push additional 3 OptGroupExpressionTask into the stack when running +// A. Then task A is wiped out from the stack. With the FILO rule, the stack-top B will be popped out and +// run, from which it will find valid rules for its member group expression and encapsulate ApplyRuleTask +// for each of those valid rules. Say we got two valid rules here, so it will push another two task with +// type C into the stack, note, since current B's child group hasn't been optimized yet, so the cascaded +// task A will be triggered and pushed into the stack as well, and they are queued after rule tasks. then +// the old toppest B is wiped out from the stack. // -// At last, when the stack is out of task calling internally, or forcible mechanism -// is called from the outside, this stack running will be stopped. +// At last, when the stack is running out of task calling internally, or forcible mechanism is called from +// the outside, this stack running will be stopped. // // State Flow: // ┌── Opt 4 New Group Expression ──┐ @@ -76,7 +74,7 @@ var _ base.Task = &ApplyRuleTask{} // └───── Child Opt Group Trigger ─────┘ // -// ApplyRuleTask is basic logic union of scheduling apply rule. +// ApplyRuleTask is a wrapper of running basic logic union of scheduling apply rule. type ApplyRuleTask struct { BaseTask diff --git a/pkg/planner/cascades/task/task_opt_group.go b/pkg/planner/cascades/task/task_opt_group.go index 63b1b0439d771..942e075a1c5be 100644 --- a/pkg/planner/cascades/task/task_opt_group.go +++ b/pkg/planner/cascades/task/task_opt_group.go @@ -23,6 +23,7 @@ import ( var _ base.Task = &OptGroupTask{} +// OptGroupTask is a wrapper of running logic of exploring a group. type OptGroupTask struct { BaseTask diff --git a/pkg/planner/cascades/task/task_opt_group_expression.go b/pkg/planner/cascades/task/task_opt_group_expression.go index 9340497eadf4f..c9ae2d8e36533 100644 --- a/pkg/planner/cascades/task/task_opt_group_expression.go +++ b/pkg/planner/cascades/task/task_opt_group_expression.go @@ -24,6 +24,7 @@ import ( var _ base.Task = &OptGroupTask{} +// OptGroupExpressionTask is a wrapper of running logic of exploring group expression. type OptGroupExpressionTask struct { BaseTask @@ -45,7 +46,7 @@ func (ge *OptGroupExpressionTask) Execute() error { for _, one := range ruleList { ge.Push(NewApplyRuleTask(ge.ctx, ge.groupExpression, one)) } - // since it's a stack-order, LUFO, when we want to apply a rule for a specific group expression, + // since it's a stack-order, LIFO, when we want to apply a rule for a specific group expression, // the pre-condition is that this group expression's child group has been fully explored. for i := len(ge.groupExpression.Inputs) - 1; i >= 0; i-- { ge.Push(NewOptGroupTask(ge.ctx, ge.groupExpression.Inputs[i])) From 5da0f60147af41febfe02bfe7d98c46cd80b95bd Mon Sep 17 00:00:00 2001 From: arenatlx <314806019@qq.com> Date: Fri, 6 Dec 2024 15:36:13 +0800 Subject: [PATCH 3/7] . Signed-off-by: arenatlx <314806019@qq.com> --- pkg/planner/cascades/task/task.go | 24 +++++++++---------- .../cascades/task/task_scheduler_test.go | 2 +- pkg/planner/cascades/task/task_test.go | 16 ++++++------- 3 files changed, 21 insertions(+), 21 deletions(-) diff --git a/pkg/planner/cascades/task/task.go b/pkg/planner/cascades/task/task.go index 00e9b00887d7f..fc42819281f15 100644 --- a/pkg/planner/cascades/task/task.go +++ b/pkg/planner/cascades/task/task.go @@ -28,19 +28,19 @@ var stackPool = sync.Pool{ }, } -// TaskStack is used to store the optimizing tasks created before or during the optimizing process. -type TaskStack struct { +// Stack is used to store the optimizing tasks created before or during the optimizing process. +type Stack struct { tasks []base.Task } -func newTaskStack() *TaskStack { - return &TaskStack{ +func newTaskStack() *Stack { + return &Stack{ tasks: make([]base.Task, 0, 4), } } // Destroy indicates that when stack itself is useless like in the end of optimizing phase, we can destroy ourselves. -func (ts *TaskStack) Destroy() { +func (ts *Stack) Destroy() { // when a taskStack itself is useless, we can destroy itself actively. clear(ts.tasks) stackPool.Put(ts) @@ -48,7 +48,7 @@ func (ts *TaskStack) Destroy() { // Desc is used to desc the detail info about current stack state. // when use customized stack to drive the tasks, the call-chain state is dived in the stack. -func (ts *TaskStack) Desc(w util.StrBufferWriter) { +func (ts *Stack) Desc(w util.StrBufferWriter) { for _, one := range ts.tasks { one.Desc(w) w.WriteString("\n") @@ -56,12 +56,12 @@ func (ts *TaskStack) Desc(w util.StrBufferWriter) { } // Len indicates the length of current stack. -func (ts *TaskStack) Len() int { +func (ts *Stack) Len() int { return len(ts.tasks) } // Pop indicates to pop one task out of the stack. -func (ts *TaskStack) Pop() base.Task { +func (ts *Stack) Pop() base.Task { if !ts.Empty() { tmp := ts.tasks[len(ts.tasks)-1] ts.tasks = ts.tasks[:len(ts.tasks)-1] @@ -71,18 +71,18 @@ func (ts *TaskStack) Pop() base.Task { } // Push indicates to push one task into the stack. -func (ts *TaskStack) Push(one base.Task) { +func (ts *Stack) Push(one base.Task) { ts.tasks = append(ts.tasks, one) } // Empty indicates whether taskStack is empty. -func (ts *TaskStack) Empty() bool { +func (ts *Stack) Empty() bool { return ts.Len() == 0 } // BenchTest required. -func newTaskStackWithCap(c int) *TaskStack { - return &TaskStack{ +func newTaskStackWithCap(c int) *Stack { + return &Stack{ tasks: make([]base.Task, 0, c), } } diff --git a/pkg/planner/cascades/task/task_scheduler_test.go b/pkg/planner/cascades/task/task_scheduler_test.go index cbdbfa0f4ce6f..4137617d86bb6 100644 --- a/pkg/planner/cascades/task/task_scheduler_test.go +++ b/pkg/planner/cascades/task/task_scheduler_test.go @@ -26,7 +26,7 @@ import ( // TestSchedulerContext is defined to test scheduling logic here. type TestSchedulerContext struct { - ts *TaskStack + ts *Stack } func (t *TestSchedulerContext) getStack() base.Stack { diff --git a/pkg/planner/cascades/task/task_test.go b/pkg/planner/cascades/task/task_test.go index 60df6ade76c24..972e0dbac6b9e 100644 --- a/pkg/planner/cascades/task/task_test.go +++ b/pkg/planner/cascades/task/task_test.go @@ -38,11 +38,11 @@ func (t *TestTaskImpl) Desc(w util.StrBufferWriter) { func TestTaskStack(t *testing.T) { newSS := newTaskStack() - // size of pointer to TaskStack{} + // size of pointer to Stack{} require.Equal(t, int64(unsafe.Sizeof(newSS)), int64(8)) - // size of pointer to TaskStack.[]Task, cap + len + addr + // size of pointer to Stack.[]Task, cap + len + addr require.Equal(t, int64(unsafe.Sizeof(newSS.tasks)), int64(24)) - // size of pointer to TaskStack's first element Task[0] + // size of pointer to Stack's first element Task[0] newSS.Push(nil) newSS.Push(&TestTaskImpl{a: 1}) newSS.Push(nil) @@ -54,9 +54,9 @@ func TestTaskStack(t *testing.T) { func TestTaskFunctionality(t *testing.T) { taskTaskPool := stackPool.Get() - require.Equal(t, len(taskTaskPool.(*TaskStack).tasks), 0) - require.Equal(t, cap(taskTaskPool.(*TaskStack).tasks), 4) - ts := taskTaskPool.(*TaskStack) + require.Equal(t, len(taskTaskPool.(*Stack).tasks), 0) + require.Equal(t, cap(taskTaskPool.(*Stack).tasks), 4) + ts := taskTaskPool.(*Stack) ts.Push(&TestTaskImpl{a: 1}) ts.Push(&TestTaskImpl{a: 2}) one := ts.Pop() @@ -82,7 +82,7 @@ func TestTaskFunctionality(t *testing.T) { stackPool.Put(taskTaskPool) // require again. - ts = stackPool.Get().(*TaskStack) + ts = stackPool.Get().(*Stack) require.Equal(t, len(ts.tasks), 4) require.Equal(t, cap(ts.tasks), 4) // clean the stack @@ -111,7 +111,7 @@ func TestTaskFunctionality(t *testing.T) { // self destroy. ts.Destroy() - ts = stackPool.Get().(*TaskStack) + ts = stackPool.Get().(*Stack) require.Equal(t, len(ts.tasks), 0) require.Equal(t, cap(ts.tasks), 4) } From 2dafa1e0a86b9d70a7f57ee48adacbdc5974b04f Mon Sep 17 00:00:00 2001 From: arenatlx <314806019@qq.com> Date: Fri, 6 Dec 2024 17:20:36 +0800 Subject: [PATCH 4/7] . Signed-off-by: arenatlx <314806019@qq.com> --- pkg/planner/cascades/memo/group.go | 12 ++++++++++++ pkg/planner/cascades/task/task_apply_rule.go | 5 +---- pkg/planner/cascades/task/task_opt_group.go | 3 ++- 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/pkg/planner/cascades/memo/group.go b/pkg/planner/cascades/memo/group.go index 8d92e7ba02d5b..55d78609b59a8 100644 --- a/pkg/planner/cascades/memo/group.go +++ b/pkg/planner/cascades/memo/group.go @@ -200,3 +200,15 @@ func (g *Group) IsExplored() bool { func (g *Group) SetExplored() { g.explored = true } + +// ForEachGE traverse the inside group expression with f call on them each. +func (g *Group) ForEachGE(f func(ge *GroupExpression) bool) { + var next bool + for elem := g.logicalExpressions.Front(); elem != nil; elem = elem.Next() { + expr := elem.Value.(*GroupExpression) + next = f(expr) + if !next { + break + } + } +} diff --git a/pkg/planner/cascades/task/task_apply_rule.go b/pkg/planner/cascades/task/task_apply_rule.go index ca9b253f5cd2d..221e215b7870d 100644 --- a/pkg/planner/cascades/task/task_apply_rule.go +++ b/pkg/planner/cascades/task/task_apply_rule.go @@ -111,10 +111,7 @@ func (a *ApplyRuleTask) Execute() error { return err } for _, ne := range newExprs { - newGroupExpr, err := a.ctx.GetMemo().CopyIn(a.gE.GetGroup(), ne) - if err != nil { - return err - } + newGroupExpr := a.ctx.GetMemo().CopyIn(a.gE.GetGroup(), ne) // YAMS only care about logical plan now. a.Push(NewOptGroupExpressionTask(a.ctx, newGroupExpr)) } diff --git a/pkg/planner/cascades/task/task_opt_group.go b/pkg/planner/cascades/task/task_opt_group.go index 942e075a1c5be..8357fa5fbfbcb 100644 --- a/pkg/planner/cascades/task/task_opt_group.go +++ b/pkg/planner/cascades/task/task_opt_group.go @@ -42,8 +42,9 @@ func (g *OptGroupTask) Execute() error { if g.group.IsExplored() { return nil } - g.group.ForEachGE(func(ge *memo.GroupExpression) { + g.group.ForEachGE(func(ge *memo.GroupExpression) bool { g.Push(NewOptGroupExpressionTask(g.ctx, ge)) + return true }) g.group.SetExplored() return nil From f33d06d118a66827cf5675bd888aaec74c0407b5 Mon Sep 17 00:00:00 2001 From: arenatlx <314806019@qq.com> Date: Fri, 6 Dec 2024 18:00:33 +0800 Subject: [PATCH 5/7] . Signed-off-by: arenatlx <314806019@qq.com> --- pkg/planner/cascades/task/task_opt_group_expression.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/planner/cascades/task/task_opt_group_expression.go b/pkg/planner/cascades/task/task_opt_group_expression.go index c9ae2d8e36533..260facbb24cf7 100644 --- a/pkg/planner/cascades/task/task_opt_group_expression.go +++ b/pkg/planner/cascades/task/task_opt_group_expression.go @@ -62,7 +62,7 @@ func (ge *OptGroupExpressionTask) Desc(w util.StrBufferWriter) { } // getValidRules filter the allowed rule from session variable, and system config. -func (ge *OptGroupExpressionTask) getValidRules() []rule.Rule { +func (*OptGroupExpressionTask) getValidRules() []rule.Rule { // todo: add rule set return []rule.Rule{} } From a7df189a49582e847160a4050c91e2379ea1a1b6 Mon Sep 17 00:00:00 2001 From: AilinKid <314806019@qq.com> Date: Thu, 12 Dec 2024 10:55:58 +0800 Subject: [PATCH 6/7] . Signed-off-by: AilinKid <314806019@qq.com> --- pkg/planner/cascades/memo/group.go | 32 ------------------------------ 1 file changed, 32 deletions(-) diff --git a/pkg/planner/cascades/memo/group.go b/pkg/planner/cascades/memo/group.go index 55d78609b59a8..c9a7f4a3286d1 100644 --- a/pkg/planner/cascades/memo/group.go +++ b/pkg/planner/cascades/memo/group.go @@ -180,35 +180,3 @@ func NewGroup(prop *property.LogicalProperty) *Group { } return g } - -// GetLogicalProperty return this group's logical property. -func (g *Group) GetLogicalProperty() *property.LogicalProperty { - return g.logicalProp -} - -// SetLogicalProperty set this group's logical property. -func (g *Group) SetLogicalProperty(prop *property.LogicalProperty) { - g.logicalProp = prop -} - -// IsExplored returns whether this group is explored. -func (g *Group) IsExplored() bool { - return g.explored -} - -// SetExplored set the group as tagged as explored. -func (g *Group) SetExplored() { - g.explored = true -} - -// ForEachGE traverse the inside group expression with f call on them each. -func (g *Group) ForEachGE(f func(ge *GroupExpression) bool) { - var next bool - for elem := g.logicalExpressions.Front(); elem != nil; elem = elem.Next() { - expr := elem.Value.(*GroupExpression) - next = f(expr) - if !next { - break - } - } -} From 0cf0274d3ea3f5cafb42d9af660ae37c22041461 Mon Sep 17 00:00:00 2001 From: arenatlx <314806019@qq.com> Date: Thu, 12 Dec 2024 15:28:58 +0800 Subject: [PATCH 7/7] . Signed-off-by: arenatlx <314806019@qq.com> --- pkg/planner/cascades/memo/memo.go | 9 ++++++--- pkg/planner/cascades/rule/binder_test.go | 9 ++++++--- pkg/planner/cascades/task/task_apply_rule.go | 5 ++++- pkg/planner/core/casetest/cascades/memo_test.go | 3 ++- 4 files changed, 18 insertions(+), 8 deletions(-) diff --git a/pkg/planner/cascades/memo/memo.go b/pkg/planner/cascades/memo/memo.go index c5c51ab3ffedd..f35b227ca8de4 100644 --- a/pkg/planner/cascades/memo/memo.go +++ b/pkg/planner/cascades/memo/memo.go @@ -155,11 +155,14 @@ func (mm *Memo) NewGroup() *Group { } // Init initializes the memo with a logical plan, converting logical plan tree format into group tree. -func (mm *Memo) Init(plan base.LogicalPlan) *GroupExpression { +func (mm *Memo) Init(plan base.LogicalPlan) (*GroupExpression, error) { intest.Assert(mm.groups.Len() == 0) - gE, _ := mm.CopyIn(nil, plan) + gE, err := mm.CopyIn(nil, plan) + if err != nil { + return nil, err + } mm.rootGroup = gE.GetGroup() - return gE + return gE, nil } // ForEachGroup traverse the inside group expression with f call on them each. diff --git a/pkg/planner/cascades/rule/binder_test.go b/pkg/planner/cascades/rule/binder_test.go index 49fbac0f4d9b7..56d3290b2507d 100644 --- a/pkg/planner/cascades/rule/binder_test.go +++ b/pkg/planner/cascades/rule/binder_test.go @@ -239,7 +239,8 @@ func TestBinderMultiNext(t *testing.T) { t4 := logicalop.DataSource{TableAsName: &asT4}.Init(ctx, 0) mm := memo.NewMemo() - gE := mm.Init(join1) + gE, err := mm.Init(join1) + require.Nil(t, err) // which means t1 and t3 are equivalent class. mm.CopyIn(gE.Inputs[0], t3) @@ -340,7 +341,8 @@ func TestBinderAny(t *testing.T) { t4 := logicalop.DataSource{TableAsName: &asT4}.Init(ctx, 0) mm := memo.NewMemo() - gE := mm.Init(join1) + gE, err := mm.Init(join1) + require.Nil(t, err) // which means t1 and t3 are equivalent class. mm.CopyIn(gE.Inputs[0], t3) @@ -427,7 +429,8 @@ func TestBinderMultiAny(t *testing.T) { t4 := logicalop.DataSource{TableAsName: &asT4}.Init(ctx, 0) mm := memo.NewMemo() - gE := mm.Init(join1) + gE, err := mm.Init(join1) + require.Nil(t, err) // which means t1 and t3 are equivalent class. mm.CopyIn(gE.Inputs[0], t3) diff --git a/pkg/planner/cascades/task/task_apply_rule.go b/pkg/planner/cascades/task/task_apply_rule.go index 221e215b7870d..ca9b253f5cd2d 100644 --- a/pkg/planner/cascades/task/task_apply_rule.go +++ b/pkg/planner/cascades/task/task_apply_rule.go @@ -111,7 +111,10 @@ func (a *ApplyRuleTask) Execute() error { return err } for _, ne := range newExprs { - newGroupExpr := a.ctx.GetMemo().CopyIn(a.gE.GetGroup(), ne) + newGroupExpr, err := a.ctx.GetMemo().CopyIn(a.gE.GetGroup(), ne) + if err != nil { + return err + } // YAMS only care about logical plan now. a.Push(NewOptGroupExpressionTask(a.ctx, newGroupExpr)) } diff --git a/pkg/planner/core/casetest/cascades/memo_test.go b/pkg/planner/core/casetest/cascades/memo_test.go index dae0149f20db5..6143b3a3387e6 100644 --- a/pkg/planner/core/casetest/cascades/memo_test.go +++ b/pkg/planner/core/casetest/cascades/memo_test.go @@ -72,7 +72,8 @@ func TestDeriveStats(t *testing.T) { // after stats derive is done, which means the up-down propagation of group ndv is done, in bottom-up building phase // of memo, we don't have to expect the upper operator's group cols passing down anymore. mm := memo.NewMemo() - mm.Init(lp) + _, err = mm.Init(lp) + require.Nil(t, err) // check the stats state in memo group. b := &bytes.Buffer{} sb := util.NewStrBuffer(b)