Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: add decorrelate-apply test & add support for move out intermediary GE from memo. #59171

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/planner/cascades/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ go_library(
"//pkg/planner/cascades/base",
"//pkg/planner/cascades/base/cascadesctx",
"//pkg/planner/cascades/memo",
"//pkg/planner/cascades/rule",
"//pkg/planner/cascades/task",
"//pkg/planner/core/base",
"//pkg/util/intest",
"@com_github_bits_and_blooms_bitset//:bitset",
],
)

Expand Down
1 change: 1 addition & 0 deletions pkg/planner/cascades/base/cascadesctx/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ go_library(
deps = [
"//pkg/planner/cascades/base",
"//pkg/planner/cascades/memo",
"@com_github_bits_and_blooms_bitset//:bitset",
],
)
2 changes: 2 additions & 0 deletions pkg/planner/cascades/base/cascadesctx/cascades_ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package cascadesctx

import (
"github.com/bits-and-blooms/bitset"
"github.com/pingcap/tidb/pkg/planner/cascades/base"
"github.com/pingcap/tidb/pkg/planner/cascades/memo"
)
Expand All @@ -29,4 +30,5 @@ type Context interface {
GetScheduler() base.Scheduler
PushTask(task base.Task)
GetMemo() *memo.Memo
GetRuleMask() *bitset.BitSet
}
20 changes: 20 additions & 0 deletions pkg/planner/cascades/cascades.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@
package cascades

import (
"github.com/bits-and-blooms/bitset"
"github.com/pingcap/tidb/pkg/planner/cascades/base"
"github.com/pingcap/tidb/pkg/planner/cascades/base/cascadesctx"
"github.com/pingcap/tidb/pkg/planner/cascades/memo"
"github.com/pingcap/tidb/pkg/planner/cascades/rule"
"github.com/pingcap/tidb/pkg/planner/cascades/task"
corebase "github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/util/intest"
)

var MaximumRuleLength uint

// Optimizer is a basic cascades search framework portal, driven by Context.
type Optimizer struct {
logic corebase.LogicalPlan
Expand Down Expand Up @@ -60,6 +64,13 @@ func (c *Optimizer) GetMemo() *memo.Memo {
return c.ctx.GetMemo()
}

// SetRules set a series of allowed rule ids.
func (c *Optimizer) SetRules(ids []uint) {
for _, id := range ids {
c.ctx.(*Context).ruleMask.Set(id)
}
}

// Context includes all the context stuff when go through memo optimizing.
type Context struct {
// pctx variable awareness.
Expand All @@ -68,6 +79,8 @@ type Context struct {
mm *memo.Memo
// task pool management.
scheduler base.Scheduler
// rule mask.
ruleMask *bitset.BitSet
}

// NewContext returns a new memo context responsible for manage all the stuff in cascades opt.
Expand All @@ -78,6 +91,8 @@ func NewContext(pctx corebase.PlanContext) *Context {
mm: memo.NewMemo(pctx.GetSessionVars().StmtCtx.OperatorNum),
// task pool management.
scheduler: task.NewSimpleTaskScheduler(),
// new rule mask.
ruleMask: bitset.New(uint(rule.XFMaximumRuleLength)),
}
}

Expand All @@ -103,3 +118,8 @@ func (c *Context) PushTask(task base.Task) {
func (c *Context) GetMemo() *memo.Memo {
return c.mm
}

// GetRuleMask implements the cascades context interface.`
func (c *Context) GetRuleMask() *bitset.BitSet {
return c.ruleMask
}
18 changes: 18 additions & 0 deletions pkg/planner/cascades/memo/memo.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,24 @@ func (mm *Memo) CopyIn(target *Group, lp base.LogicalPlan) (*GroupExpression, er
return groupExpr, nil
}

// RemoveOut remove the old invalid GE out of target group, make sure insert first, then delete.
func (mm *Memo) RemoveOut(target *Group, lp base.LogicalPlan) {
intest.Assert(target != nil)
intest.Assert(lp != nil)
ge := lp.(*GroupExpression)
intest.Assert(ge != nil)
// delete from group
target.Delete(ge)
// delete from global
mm.hash2GlobalGroupExpr.Remove(ge)
// maintain the parentGERef.
for _, childG := range ge.Inputs {
childG.removeParentGEs(ge)
}
// mark current ge as abandoned in case of it has been used in pushed task.
ge.SetAbandoned()
}

// GetGroups gets all groups in the memo.
func (mm *Memo) GetGroups() *list.List {
return mm.groups
Expand Down
29 changes: 27 additions & 2 deletions pkg/planner/cascades/rule/apply/decorrelate_apply/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "decorrelate_apply",
srcs = ["xf_decorrelate_apply.go"],
srcs = [
"xf_decorrelate_apply_base.go",
"xf_decorrelate_simple_apply.go",
],
importpath = "github.com/pingcap/tidb/pkg/planner/cascades/rule/apply/decorrelate_apply",
visibility = ["//visibility:public"],
deps = [
Expand All @@ -15,3 +18,25 @@ go_library(
"//pkg/util/plancodec",
],
)

go_test(
name = "decorrelate_apply_test",
timeout = "short",
srcs = ["xf_decorrelate_apply_test.go"],
flaky = True,
deps = [
":decorrelate_apply",
"//pkg/expression",
"//pkg/parser/ast",
"//pkg/planner/cascades",
"//pkg/planner/cascades/base",
"//pkg/planner/cascades/memo",
"//pkg/planner/core/base",
"//pkg/planner/core/operator/logicalop",
"//pkg/types",
"//pkg/util/mock",
"//pkg/util/plancodec",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package decorrelate_apply

import (
"github.com/pingcap/tidb/pkg/planner/cascades/rule"
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/planner/core/operator/logicalop"
)

var _ rule.Rule = &XFDeCorrelateApplyBase{}

// XFDeCorrelateApplyBase pull the correlated expression from projection as child of apply.
type XFDeCorrelateApplyBase struct {
*rule.BaseRule
}

// PreCheck implements the Rule interface.
func (*XFDeCorrelateApplyBase) PreCheck(applyGE base.LogicalPlan) bool {
apply := applyGE.GetWrappedLogicalPlan().(*logicalop.LogicalApply)
if apply.NoDecorrelate {
return false
}
return true
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package decorrelate_apply_test

import (
"testing"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/planner/cascades"
"github.com/pingcap/tidb/pkg/planner/cascades/base"
"github.com/pingcap/tidb/pkg/planner/cascades/memo"
"github.com/pingcap/tidb/pkg/planner/cascades/rule/apply/decorrelate_apply"
corebase "github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/planner/core/operator/logicalop"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/mock"
"github.com/pingcap/tidb/pkg/util/plancodec"
"github.com/stretchr/testify/require"
)

// when the original is applied with pulling up predicate and generate a temporary apply.
// this apply is called intermediary apply, which is not the final target of what we want.
// the subsequent XFDeCorrelate rules will be applied on them later, once generate a new
// other intermediary apply, current src intermediary apply should be removed from memo.
func TestXFDeCorrelateShouldDeleteIntermediaryApply(t *testing.T) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/planner/cascades/memo/MockPlanSkipMemoDeriveStats", `return(true)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/planner/cascades/memo/MockPlanSkipMemoDeriveStats"))
}()

ctx := mock.NewContext()
// new logical schema producer.
sp := logicalop.LogicalSchemaProducer{}
col1 := &expression.Column{
ID: 1,
}
sp.SetSchema(expression.NewSchema(col1))
name := &types.FieldName{ColName: ast.NewCIStr("a")}
names := types.NameSlice{name}
sp.SetOutputNames(names)
sp.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, "apply", nil, 0)

sp2 := logicalop.LogicalSchemaProducer{}
col2 := &expression.Column{
ID: 2,
}
sp2.SetSchema(expression.NewSchema(col2))
name = &types.FieldName{ColName: ast.NewCIStr("b")}
names = types.NameSlice{name}
sp2.SetOutputNames(names)
sp2.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, "ds1", nil, 0)
ds2 := &logicalop.DataSource{
LogicalSchemaProducer: sp2,
}
sp3 := logicalop.LogicalSchemaProducer{}
col3 := &expression.Column{
ID: 3,
}
sp3.SetSchema(expression.NewSchema(col3))
name = &types.FieldName{ColName: ast.NewCIStr("c")}
names = types.NameSlice{name}
sp3.SetOutputNames(names)
sp3.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, "ds2", nil, 0)
ds3 := &logicalop.DataSource{
LogicalSchemaProducer: sp3,
}
hasher := base.NewHashEqualer()
ds2.Hash64(hasher)
ds2Hash64 := hasher.Sum64()
hasher.Reset()
ds3.Hash64(hasher)
ds3Hash64 := hasher.Sum64()
require.NotEqual(t, ds2Hash64, ds3Hash64)

sp.BaseLogicalPlan.SetChildren(ds2, ds3)
join := &logicalop.LogicalJoin{
LogicalSchemaProducer: sp,
}
apply := &logicalop.LogicalApply{LogicalJoin: *join, NoDecorrelate: false}
apply.SetSelf(apply)
apply.SetTP(plancodec.TypeApply)
mm := memo.NewMemo()
mm.Init(apply)
myRule := decorrelate_apply.NewXFDeCorrelateSimpleApply()
cas, err := cascades.NewOptimizer(apply)
require.Nil(t, err)
// only allow NewXFDeCorrelateSimpleApply.
cas.SetRules([]uint{myRule.ID()})
defer cas.Destroy()
err = cas.Execute()
require.Nil(t, err)

length := 0
// logical plan num should be 2.
cas.GetMemo().NewIterator().Each(func(plan corebase.LogicalPlan) bool {
if length == 0 {
apply := plan.(*logicalop.LogicalApply)
require.True(t, apply != nil)
require.True(t, apply.Schema().Columns[0].ID == 1)
require.True(t, apply.Self() == apply)
require.True(t, apply.TP() == plancodec.TypeApply)

ds1 := plan.Children()[0].(*logicalop.DataSource)
require.True(t, ds1 != nil)
require.True(t, ds1.Schema().Columns[0].ID == 2)
ds2 := plan.Children()[1].(*logicalop.DataSource)
require.True(t, ds2 != nil)
require.True(t, ds2.Schema().Columns[0].ID == 3)
} else if length == 1 {
join := plan.(*logicalop.LogicalJoin)
require.True(t, join != nil)
require.True(t, join.Schema().Columns[0].ID == 1)
require.True(t, join.Schema().Columns[0] == apply.Schema().Columns[0]) // ref
require.True(t, join.Self() == join)
require.True(t, join.TP() == plancodec.TypeJoin)

ds1 := plan.Children()[0].(*logicalop.DataSource)
require.True(t, ds1 != nil)
require.True(t, ds1.Schema().Columns[0].ID == 2)
ds2 := plan.Children()[1].(*logicalop.DataSource)
require.True(t, ds2 != nil)
require.True(t, ds2.Schema().Columns[0].ID == 3)
}
length++
return true
})
require.True(t, length == 2)

// restore the original plan tree, and mark the Apply generated from xForm rule.
apply.BaseLogicalPlan.SetChildren(ds2, ds3)
apply.SetFlag(logicalop.ApplyGenFromXFDeCorrelateRuleFlag)
mm.Destroy()
mm.Init(apply)
myRule = decorrelate_apply.NewXFDeCorrelateSimpleApply()
cas, err = cascades.NewOptimizer(apply)
require.Nil(t, err)
// only allow NewXFDeCorrelateSimpleApply.
cas.SetRules([]uint{myRule.ID()})
defer cas.Destroy()
err = cas.Execute()
require.Nil(t, err)

// logical plan num should be 1.
length = 0
cas.GetMemo().NewIterator().Each(func(plan corebase.LogicalPlan) bool {
if length == 0 {
join := plan.(*logicalop.LogicalJoin)
require.True(t, join != nil)
require.True(t, join.Schema().Columns[0].ID == 1)
require.True(t, join.Schema().Columns[0] == apply.Schema().Columns[0]) // ref
require.True(t, join.Self() == join)
require.True(t, join.TP() == plancodec.TypeJoin)

ds1 := plan.Children()[0].(*logicalop.DataSource)
require.True(t, ds1 != nil)
require.True(t, ds1.Schema().Columns[0].ID == 2)
ds2 := plan.Children()[1].(*logicalop.DataSource)
require.True(t, ds2 != nil)
require.True(t, ds2.Schema().Columns[0].ID == 3)
}
length++
return true
})
require.True(t, length == 1)
}
Loading