From a66a80ef23456bcd22e067da0d995c17e4eca83f Mon Sep 17 00:00:00 2001 From: Arenatlx <314806019@qq.com> Date: Mon, 25 Mar 2024 19:15:46 +0800 Subject: [PATCH] planner: add simple serializing scheduler (#51866) ref pingcap/tidb#51664 --- pkg/planner/memo/BUILD.bazel | 4 +- pkg/planner/memo/task.go | 79 ++++++++++++++++++--- pkg/planner/memo/task_scheduler.go | 55 +++++++++++++++ pkg/planner/memo/task_scheduler_test.go | 70 ++++++++++++++++++ pkg/planner/memo/task_test.go | 90 +++++++++++++++++------- pkg/planner/property/logical_property.go | 2 + 6 files changed, 267 insertions(+), 33 deletions(-) create mode 100644 pkg/planner/memo/task_scheduler.go create mode 100644 pkg/planner/memo/task_scheduler_test.go diff --git a/pkg/planner/memo/BUILD.bazel b/pkg/planner/memo/BUILD.bazel index d78fbd7b42549..25219a1f3ab35 100644 --- a/pkg/planner/memo/BUILD.bazel +++ b/pkg/planner/memo/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "implementation.go", "pattern.go", "task.go", + "task_scheduler.go", ], importpath = "github.com/pingcap/tidb/pkg/planner/memo", visibility = ["//visibility:public"], @@ -28,11 +29,12 @@ go_test( "group_test.go", "main_test.go", "pattern_test.go", + "task_scheduler_test.go", "task_test.go", ], embed = [":memo"], flaky = True, - shard_count = 24, + shard_count = 25, deps = [ "//pkg/domain", "//pkg/expression", diff --git a/pkg/planner/memo/task.go b/pkg/planner/memo/task.go index 4380c69ddaf67..582b5131420eb 100644 --- a/pkg/planner/memo/task.go +++ b/pkg/planner/memo/task.go @@ -15,6 +15,7 @@ package memo import ( + "strings" "sync" ) @@ -26,6 +27,14 @@ type Task interface { desc() string } +// Stack is abstract definition of task container.(TaskStack is a kind of array stack implementation of it) +type Stack interface { + Push(one Task) + Pop() Task + Empty() bool + Destroy() +} + // TaskStackPool is initialized for memory saving by reusing taskStack. var TaskStackPool = sync.Pool{ New: func() any { @@ -34,30 +43,41 @@ var TaskStackPool = sync.Pool{ } // TaskStack is used to store the optimizing tasks created before or during the optimizing process. -type TaskStack struct { +type taskStack struct { tasks []Task } -func newTaskStack() *TaskStack { - return &TaskStack{ +func newTaskStack() *taskStack { + return &taskStack{ tasks: make([]Task, 0, 4), } } // Destroy indicates that when stack itself is useless like in the end of optimizing phase, we can destroy ourselves. -func (ts *TaskStack) Destroy() { +func (ts *taskStack) Destroy() { // when a taskStack itself is useless, we can destroy itself actively. clear(ts.tasks) TaskStackPool.Put(ts) } +// Desc is used to desc the detail info about current stack state. +// when use customized stack to drive the tasks, the call-chain state is dived in the stack. +func (ts *taskStack) Desc() string { + var str strings.Builder + for _, one := range ts.tasks { + str.WriteString(one.desc()) + str.WriteString("\n") + } + return str.String() +} + // Len indicates the length of current stack. -func (ts *TaskStack) Len() int { +func (ts *taskStack) Len() int { return len(ts.tasks) } // Pop indicates to pop one task out of the stack. -func (ts *TaskStack) Pop() Task { +func (ts *taskStack) Pop() Task { if !ts.Empty() { tmp := ts.tasks[len(ts.tasks)-1] ts.tasks = ts.tasks[:len(ts.tasks)-1] @@ -67,11 +87,54 @@ func (ts *TaskStack) Pop() Task { } // Push indicates to push one task into the stack. -func (ts *TaskStack) Push(one Task) { +func (ts *taskStack) Push(one Task) { ts.tasks = append(ts.tasks, one) } // Empty indicates whether taskStack is empty. -func (ts *TaskStack) Empty() bool { +func (ts *taskStack) Empty() bool { + return ts.Len() == 0 +} + +// BenchTest required. +func newTaskStackWithCap(c int) *taskStack { + return &taskStack{ + tasks: make([]Task, 0, c), + } +} + +// TaskStack2 is used to store the optimizing tasks created before or during the optimizing process. +type taskStack2 struct { + tasks []*Task +} + +func newTaskStack2WithCap(c int) *taskStack2 { + return &taskStack2{ + tasks: make([]*Task, 0, c), + } +} + +// Push indicates to push one task into the stack. +func (ts *taskStack2) Push(one Task) { + ts.tasks = append(ts.tasks, &one) +} + +// Len indicates the length of current stack. +func (ts *taskStack2) Len() int { + return len(ts.tasks) +} + +// Empty indicates whether taskStack is empty. +func (ts *taskStack2) Empty() bool { return ts.Len() == 0 } + +// Pop indicates to pop one task out of the stack. +func (ts *taskStack2) Pop() Task { + if !ts.Empty() { + tmp := ts.tasks[len(ts.tasks)-1] + ts.tasks = ts.tasks[:len(ts.tasks)-1] + return *tmp + } + return nil +} diff --git a/pkg/planner/memo/task_scheduler.go b/pkg/planner/memo/task_scheduler.go new file mode 100644 index 0000000000000..dddfbdbe69957 --- /dev/null +++ b/pkg/planner/memo/task_scheduler.go @@ -0,0 +1,55 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package memo + +var _ TaskScheduler = &SimpleTaskScheduler{} + +// TaskScheduler is a scheduling interface defined for serializing(single thread)/concurrent(multi thread) running. +type TaskScheduler interface { + ExecuteTasks() +} + +// SimpleTaskScheduler is defined for serializing scheduling of memo tasks. +type SimpleTaskScheduler struct { + Err error + SchedulerCtx TaskSchedulerContext +} + +// ExecuteTasks implements the interface of TaskScheduler. +func (s *SimpleTaskScheduler) ExecuteTasks() { + stack := s.SchedulerCtx.getStack() + defer func() { + // when step out of the scheduler, if the stack is empty, clean and release it. + if !stack.Empty() { + stack.Destroy() + } + }() + for !stack.Empty() { + // when use customized stack to drive the tasks, the call-chain state is dived in the stack. + task := stack.Pop() + if err := task.execute(); err != nil { + s.Err = err + return + } + } +} + +// TaskSchedulerContext is defined for scheduling logic calling, also facilitate interface-oriented coding and testing. +type TaskSchedulerContext interface { + // we exported the Stack interface here rather than the basic stack implementation. + getStack() Stack + // we exported the only one push action to user, Task is an interface definition. + pushTask(task Task) +} diff --git a/pkg/planner/memo/task_scheduler_test.go b/pkg/planner/memo/task_scheduler_test.go new file mode 100644 index 0000000000000..8a596c107d855 --- /dev/null +++ b/pkg/planner/memo/task_scheduler_test.go @@ -0,0 +1,70 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package memo + +import ( + "errors" + "strconv" + "testing" + + "github.com/stretchr/testify/require" +) + +// TestSchedulerContext is defined to test scheduling logic here. +type TestSchedulerContext struct { + ts *taskStack +} + +func (t *TestSchedulerContext) getStack() Stack { + return t.ts +} + +func (t *TestSchedulerContext) pushTask(task Task) { + t.ts.Push(task) +} + +// TestSchedulerContext is defined to mock special error state in specified task. +type TestTaskImpl2 struct { + a int64 +} + +func (t *TestTaskImpl2) execute() error { + // mock error at special task + if t.a == 2 { + return errors.New("mock error at task id = 2") + } + return nil +} + +func (t *TestTaskImpl2) desc() string { + return strconv.Itoa(int(t.a)) +} + +func TestSimpleTaskScheduler(t *testing.T) { + testSchedulerContext := &TestSchedulerContext{ + newTaskStack(), + } + testScheduler := &SimpleTaskScheduler{ + SchedulerCtx: testSchedulerContext, + } + testScheduler.SchedulerCtx.pushTask(&TestTaskImpl2{a: 1}) + testScheduler.SchedulerCtx.pushTask(&TestTaskImpl2{a: 2}) + testScheduler.SchedulerCtx.pushTask(&TestTaskImpl2{a: 3}) + + var testTaskScheduler TaskScheduler = testScheduler + testTaskScheduler.ExecuteTasks() + require.NotNil(t, testScheduler.Err) + require.Equal(t, testScheduler.Err.Error(), "mock error at task id = 2") +} diff --git a/pkg/planner/memo/task_test.go b/pkg/planner/memo/task_test.go index d14d39ccb4661..1f1aa3b39c496 100644 --- a/pkg/planner/memo/task_test.go +++ b/pkg/planner/memo/task_test.go @@ -51,45 +51,87 @@ func TestTaskStack(t *testing.T) { func TestTaskFunctionality(t *testing.T) { taskTaskPool := TaskStackPool.Get() - require.Equal(t, len(taskTaskPool.(*TaskStack).tasks), 0) - require.Equal(t, cap(taskTaskPool.(*TaskStack).tasks), 4) - taskStack := taskTaskPool.(*TaskStack) - taskStack.Push(&TestTaskImpl{a: 1}) - taskStack.Push(&TestTaskImpl{a: 2}) - one := taskStack.Pop() + require.Equal(t, len(taskTaskPool.(*taskStack).tasks), 0) + require.Equal(t, cap(taskTaskPool.(*taskStack).tasks), 4) + ts := taskTaskPool.(*taskStack) + ts.Push(&TestTaskImpl{a: 1}) + ts.Push(&TestTaskImpl{a: 2}) + one := ts.Pop() require.Equal(t, one.desc(), "2") - one = taskStack.Pop() + one = ts.Pop() require.Equal(t, one.desc(), "1") // empty, pop nil. - one = taskStack.Pop() + one = ts.Pop() require.Nil(t, one) - taskStack.Push(&TestTaskImpl{a: 3}) - taskStack.Push(&TestTaskImpl{a: 4}) - taskStack.Push(&TestTaskImpl{a: 5}) - taskStack.Push(&TestTaskImpl{a: 6}) + ts.Push(&TestTaskImpl{a: 3}) + ts.Push(&TestTaskImpl{a: 4}) + ts.Push(&TestTaskImpl{a: 5}) + ts.Push(&TestTaskImpl{a: 6}) // no clean, put it back TaskStackPool.Put(taskTaskPool) // require again. - taskTaskPool = TaskStackPool.Get() - require.Equal(t, len(taskTaskPool.(*TaskStack).tasks), 4) - require.Equal(t, cap(taskTaskPool.(*TaskStack).tasks), 4) + ts = TaskStackPool.Get().(*taskStack) + require.Equal(t, len(ts.tasks), 4) + require.Equal(t, cap(ts.tasks), 4) // clean the stack - one = taskStack.Pop() + one = ts.Pop() require.Equal(t, one.desc(), "6") - one = taskStack.Pop() + one = ts.Pop() require.Equal(t, one.desc(), "5") - one = taskStack.Pop() + one = ts.Pop() require.Equal(t, one.desc(), "4") - one = taskStack.Pop() + one = ts.Pop() require.Equal(t, one.desc(), "3") - one = taskStack.Pop() + one = ts.Pop() require.Nil(t, one) // self destroy. - taskStack.Destroy() - taskTaskPool = TaskStackPool.Get() - require.Equal(t, len(taskTaskPool.(*TaskStack).tasks), 0) - require.Equal(t, cap(taskTaskPool.(*TaskStack).tasks), 4) + ts.Destroy() + ts = TaskStackPool.Get().(*taskStack) + require.Equal(t, len(ts.tasks), 0) + require.Equal(t, cap(ts.tasks), 4) +} + +// Benchmark result explanation: +// On the right side of the function name, you have four values, 43803,27569 ns/op,24000 B/op and 2000 allocs/op +// The former indicates the total number of times the loop was executed, while the latter is the average amount +// of time each iteration took to complete, expressed in nanoseconds per operation. The third is the costed Byte +// of each op, the last one is number of allocs of each op. + +// BenchmarkTestStack2Pointer-8 43802 27569 ns/op 24000 B/op 2000 allocs/op +// BenchmarkTestStack2Pointer-8 42889 27017 ns/op 24000 B/op 2000 allocs/op +// BenchmarkTestStack2Pointer-8 43009 27524 ns/op 24000 B/op 2000 allocs/op +func BenchmarkTestStack2Pointer(b *testing.B) { + stack := newTaskStack2WithCap(1000) + fill := func() { + for idx := int64(0); idx < 1000; idx++ { + stack.Push(&TestTaskImpl{a: idx}) + } + for idx := int64(0); idx < 1000; idx++ { + stack.Pop() + } + } + for i := 0; i < b.N; i++ { + fill() + } +} + +// BenchmarkTestStackInterface-8 108644 10736 ns/op 8000 B/op 1000 allocs/op +// BenchmarkTestStackInterface-8 110587 10756 ns/op 8000 B/op 1000 allocs/op +// BenchmarkTestStackInterface-8 109136 10850 ns/op 8000 B/op 1000 allocs/op +func BenchmarkTestStackInterface(b *testing.B) { + stack := newTaskStackWithCap(1000) + fill := func() { + for idx := int64(0); idx < 1000; idx++ { + stack.Push(&TestTaskImpl{a: idx}) + } + for idx := int64(0); idx < 1000; idx++ { + stack.Pop() + } + } + for i := 0; i < b.N; i++ { + fill() + } } diff --git a/pkg/planner/property/logical_property.go b/pkg/planner/property/logical_property.go index d7bbf132979ef..ffa8e90b24796 100644 --- a/pkg/planner/property/logical_property.go +++ b/pkg/planner/property/logical_property.go @@ -26,3 +26,5 @@ type LogicalProperty struct { Schema *expression.Schema MaxOneRow bool } + +// todo: ScalarProperty: usedColumns in current scalar expr, null reject, cor-related, subq contained and so on