Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner, util/ranger: restrict mem usage for index join inner ranges #38129

Merged
merged 10 commits into from
Sep 30, 2022
276 changes: 170 additions & 106 deletions planner/core/exhaust_physical_plans.go

Large diffs are not rendered by default.

249 changes: 216 additions & 33 deletions planner/core/exhaust_physical_plans_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,15 @@ func rewriteSimpleExpr(ctx sessionctx.Context, str string, schema *expression.Sc
return filters, nil
}

func TestIndexJoinAnalyzeLookUpFilters(t *testing.T) {
type indexJoinContext struct {
dataSourceNode *DataSource
dsNames types.NameSlice
path *util.AccessPath
joinNode *LogicalJoin
joinColNames types.NameSlice
}

func prepareForAnalyzeLookUpFilters() *indexJoinContext {
ctx := MockContext()

ctx.GetSessionVars().PlanID = -1
Expand Down Expand Up @@ -101,6 +109,10 @@ func TestIndexJoinAnalyzeLookUpFilters(t *testing.T) {
})
dataSourceNode.schema = dsSchema
dataSourceNode.stats = &property.StatsInfo{StatsVersion: statistics.PseudoVersion}
path := &util.AccessPath{
IdxCols: append(make([]*expression.Column, 0, 5), dsSchema.Columns...),
IdxColLens: []int{types.UnspecifiedLength, types.UnspecifiedLength, 2, types.UnspecifiedLength, 2},
}
outerChildSchema := expression.NewSchema()
var outerChildNames types.NameSlice
outerChildSchema.Append(&expression.Column{
Expand Down Expand Up @@ -140,22 +152,65 @@ func TestIndexJoinAnalyzeLookUpFilters(t *testing.T) {
DBName: model.NewCIStr("test"),
})
joinNode.SetSchema(expression.MergeSchema(dsSchema, outerChildSchema))
path := &util.AccessPath{
IdxCols: append(make([]*expression.Column, 0, 5), dsSchema.Columns...),
IdxColLens: []int{types.UnspecifiedLength, types.UnspecifiedLength, 2, types.UnspecifiedLength, 2},
}
joinColNames := append(dsNames.Shallow(), outerChildNames...)
return &indexJoinContext{
dataSourceNode: dataSourceNode,
dsNames: dsNames,
path: path,
joinNode: joinNode,
joinColNames: joinColNames,
}
}

tests := []struct {
innerKeys []*expression.Column
pushedDownConds string
otherConds string
ranges string
idxOff2KeyOff string
accesses string
remained string
compareFilters string
}{
type indexJoinTestCase struct {
// input
innerKeys []*expression.Column
pushedDownConds string
otherConds string
rangeMaxSize int64
rebuildMode bool

// expected output
ranges string
idxOff2KeyOff string
accesses string
remained string
compareFilters string
}

func testAnalyzeLookUpFilters(t *testing.T, testCtx *indexJoinContext, testCase *indexJoinTestCase, msgAndArgs ...interface{}) *indexJoinBuildHelper {
ctx := testCtx.dataSourceNode.ctx
ctx.GetSessionVars().RangeMaxSize = testCase.rangeMaxSize
dataSourceNode := testCtx.dataSourceNode
joinNode := testCtx.joinNode
pushed, err := rewriteSimpleExpr(ctx, testCase.pushedDownConds, dataSourceNode.schema, testCtx.dsNames)
require.NoError(t, err)
dataSourceNode.pushedDownConds = pushed
others, err := rewriteSimpleExpr(ctx, testCase.otherConds, joinNode.schema, testCtx.joinColNames)
require.NoError(t, err)
joinNode.OtherConditions = others
helper := &indexJoinBuildHelper{join: joinNode, lastColManager: nil, innerPlan: dataSourceNode}
_, err = helper.analyzeLookUpFilters(testCtx.path, dataSourceNode, testCase.innerKeys, testCase.innerKeys, testCase.rebuildMode)
if helper.chosenRanges == nil {
helper.chosenRanges = ranger.Ranges{}
}
require.NoError(t, err)
if testCase.rebuildMode {
require.Equal(t, testCase.ranges, fmt.Sprintf("%v", helper.chosenRanges.Range()), msgAndArgs)
} else {
require.Equal(t, testCase.accesses, fmt.Sprintf("%v", helper.chosenAccess), msgAndArgs)
require.Equal(t, testCase.ranges, fmt.Sprintf("%v", helper.chosenRanges.Range()), msgAndArgs)
require.Equal(t, testCase.idxOff2KeyOff, fmt.Sprintf("%v", helper.idxOff2KeyOff), msgAndArgs)
require.Equal(t, testCase.remained, fmt.Sprintf("%v", helper.chosenRemained), msgAndArgs)
require.Equal(t, testCase.compareFilters, fmt.Sprintf("%v", helper.lastColManager), msgAndArgs)
}
return helper
}

func TestIndexJoinAnalyzeLookUpFilters(t *testing.T) {
indexJoinCtx := prepareForAnalyzeLookUpFilters()
dsSchema := indexJoinCtx.dataSourceNode.schema
tests := []indexJoinTestCase{
// Join key not continuous and no pushed filter to match.
{
innerKeys: []*expression.Column{dsSchema.Columns[0], dsSchema.Columns[2]},
Expand Down Expand Up @@ -237,7 +292,7 @@ func TestIndexJoinAnalyzeLookUpFilters(t *testing.T) {
innerKeys: []*expression.Column{dsSchema.Columns[1]},
pushedDownConds: "a in (1, 2, 3) and c in ('a', 'b', 'c')",
otherConds: "",
ranges: "[[1 NULL \"a\",1 NULL \"a\"] [2 NULL \"a\",2 NULL \"a\"] [3 NULL \"a\",3 NULL \"a\"] [1 NULL \"b\",1 NULL \"b\"] [2 NULL \"b\",2 NULL \"b\"] [3 NULL \"b\",3 NULL \"b\"] [1 NULL \"c\",1 NULL \"c\"] [2 NULL \"c\",2 NULL \"c\"] [3 NULL \"c\",3 NULL \"c\"]]",
ranges: "[[1 NULL \"a\",1 NULL \"a\"] [1 NULL \"b\",1 NULL \"b\"] [1 NULL \"c\",1 NULL \"c\"] [2 NULL \"a\",2 NULL \"a\"] [2 NULL \"b\",2 NULL \"b\"] [2 NULL \"c\",2 NULL \"c\"] [3 NULL \"a\",3 NULL \"a\"] [3 NULL \"b\",3 NULL \"b\"] [3 NULL \"c\",3 NULL \"c\"]]",
idxOff2KeyOff: "[-1 0 -1 -1 -1]",
accesses: "[in(Column#1, 1, 2, 3) in(Column#3, a, b, c)]",
remained: "[in(Column#3, a, b, c)]",
Expand All @@ -248,7 +303,7 @@ func TestIndexJoinAnalyzeLookUpFilters(t *testing.T) {
innerKeys: []*expression.Column{dsSchema.Columns[1]},
pushedDownConds: "a in (1, 2, 3) and c in ('a', 'b', 'c')",
otherConds: "d > h and d < h + 100",
ranges: "[[1 NULL \"a\" NULL,1 NULL \"a\" NULL] [2 NULL \"a\" NULL,2 NULL \"a\" NULL] [3 NULL \"a\" NULL,3 NULL \"a\" NULL] [1 NULL \"b\" NULL,1 NULL \"b\" NULL] [2 NULL \"b\" NULL,2 NULL \"b\" NULL] [3 NULL \"b\" NULL,3 NULL \"b\" NULL] [1 NULL \"c\" NULL,1 NULL \"c\" NULL] [2 NULL \"c\" NULL,2 NULL \"c\" NULL] [3 NULL \"c\" NULL,3 NULL \"c\" NULL]]",
ranges: "[[1 NULL \"a\" NULL,1 NULL \"a\" NULL] [1 NULL \"b\" NULL,1 NULL \"b\" NULL] [1 NULL \"c\" NULL,1 NULL \"c\" NULL] [2 NULL \"a\" NULL,2 NULL \"a\" NULL] [2 NULL \"b\" NULL,2 NULL \"b\" NULL] [2 NULL \"c\" NULL,2 NULL \"c\" NULL] [3 NULL \"a\" NULL,3 NULL \"a\" NULL] [3 NULL \"b\" NULL,3 NULL \"b\" NULL] [3 NULL \"c\" NULL,3 NULL \"c\" NULL]]",
idxOff2KeyOff: "[-1 0 -1 -1 -1]",
accesses: "[in(Column#1, 1, 2, 3) in(Column#3, a, b, c) gt(Column#4, Column#9) lt(Column#4, plus(Column#9, 100))]",
remained: "[in(Column#3, a, b, c)]",
Expand Down Expand Up @@ -277,22 +332,150 @@ func TestIndexJoinAnalyzeLookUpFilters(t *testing.T) {
},
}
for i, tt := range tests {
pushed, err := rewriteSimpleExpr(ctx, tt.pushedDownConds, dsSchema, dsNames)
require.NoError(t, err)
dataSourceNode.pushedDownConds = pushed
others, err := rewriteSimpleExpr(ctx, tt.otherConds, joinNode.schema, joinColNames)
require.NoError(t, err)
joinNode.OtherConditions = others
helper := &indexJoinBuildHelper{join: joinNode, lastColManager: nil, innerPlan: dataSourceNode}
_, err = helper.analyzeLookUpFilters(path, dataSourceNode, tt.innerKeys, tt.innerKeys, false)
if helper.chosenRanges == nil {
helper.chosenRanges = ranger.Ranges{}
testAnalyzeLookUpFilters(t, indexJoinCtx, &tt, fmt.Sprintf("test case: %v", i))
}
}

func checkRangeFallbackAndReset(t *testing.T, ctx sessionctx.Context, expectedRangeFallback bool) {
require.Equal(t, expectedRangeFallback, ctx.GetSessionVars().StmtCtx.RangeFallback)
ctx.GetSessionVars().StmtCtx.RangeFallback = false
}

func TestRangeFallbackForAnalyzeLookUpFilters(t *testing.T) {
ijCtx := prepareForAnalyzeLookUpFilters()
ctx := ijCtx.dataSourceNode.ctx
dsSchema := ijCtx.dataSourceNode.schema

type testOutput struct {
ranges string
idxOff2KeyOff string
accesses string
remained string
compareFilters string
}

tests := []struct {
innerKeys []*expression.Column
pushedDownConds string
otherConds string
outputs []testOutput
}{
{
innerKeys: []*expression.Column{dsSchema.Columns[1], dsSchema.Columns[3]},
pushedDownConds: "a in (1, 3) and c in ('aaa', 'bbb')",
otherConds: "",
outputs: []testOutput{
{
ranges: "[[1 NULL \"aa\" NULL,1 NULL \"aa\" NULL] [1 NULL \"bb\" NULL,1 NULL \"bb\" NULL] [3 NULL \"aa\" NULL,3 NULL \"aa\" NULL] [3 NULL \"bb\" NULL,3 NULL \"bb\" NULL]]",
idxOff2KeyOff: "[-1 0 -1 1 -1]",
accesses: "[in(Column#1, 1, 3) in(Column#3, aaa, bbb)]",
remained: "[in(Column#3, aaa, bbb)]",
compareFilters: "<nil>",
},
{
ranges: "[[1 NULL \"aa\",1 NULL \"aa\"] [1 NULL \"bb\",1 NULL \"bb\"] [3 NULL \"aa\",3 NULL \"aa\"] [3 NULL \"bb\",3 NULL \"bb\"]]",
idxOff2KeyOff: "[-1 0 -1 -1 -1]",
accesses: "[in(Column#1, 1, 3) in(Column#3, aaa, bbb)]",
remained: "[in(Column#3, aaa, bbb)]",
compareFilters: "<nil>",
},
{
ranges: "[[1 NULL,1 NULL] [3 NULL,3 NULL]]",
idxOff2KeyOff: "[-1 0 -1 -1 -1]",
accesses: "[in(Column#1, 1, 3)]",
remained: "[in(Column#3, aaa, bbb)]",
compareFilters: "<nil>",
},
{
ranges: "[]",
idxOff2KeyOff: "[]",
accesses: "[]",
remained: "[]",
compareFilters: "<nil>",
time-and-fate marked this conversation as resolved.
Show resolved Hide resolved
},
},
},
{
// test haveExtraCol
innerKeys: []*expression.Column{dsSchema.Columns[0]},
pushedDownConds: "b in (1, 3, 5)",
otherConds: "c > g and c < concat(g, 'aaa')",
outputs: []testOutput{
{
ranges: "[[NULL 1 NULL,NULL 1 NULL] [NULL 3 NULL,NULL 3 NULL] [NULL 5 NULL,NULL 5 NULL]]",
idxOff2KeyOff: "[0 -1 -1 -1 -1]",
accesses: "[in(Column#2, 1, 3, 5) gt(Column#3, Column#8) lt(Column#3, concat(Column#8, aaa))]",
remained: "[]",
compareFilters: "gt(Column#3, Column#8) lt(Column#3, concat(Column#8, aaa))",
},
{
ranges: "[[NULL 1,NULL 1] [NULL 3,NULL 3] [NULL 5,NULL 5]]",
idxOff2KeyOff: "[0 -1 -1 -1 -1]",
accesses: "[in(Column#2, 1, 3, 5)]",
remained: "[]",
compareFilters: "<nil>",
},
{
ranges: "[[NULL,NULL]]",
idxOff2KeyOff: "[0 -1 -1 -1 -1]",
accesses: "[]",
remained: "[in(Column#2, 1, 3, 5)]",
compareFilters: "<nil>",
},
},
},
{
// test nextColRange
innerKeys: []*expression.Column{dsSchema.Columns[1]},
pushedDownConds: "a in (1, 3) and c > 'aaa' and c < 'bbb'",
otherConds: "",
outputs: []testOutput{
{
ranges: "[[1 NULL \"aa\",1 NULL \"bb\"] [3 NULL \"aa\",3 NULL \"bb\"]]",
idxOff2KeyOff: "[-1 0 -1 -1 -1]",
accesses: "[in(Column#1, 1, 3) gt(Column#3, aaa) lt(Column#3, bbb)]",
remained: "[gt(Column#3, aaa) lt(Column#3, bbb)]",
compareFilters: "<nil>",
},
{
ranges: "[[1 NULL,1 NULL] [3 NULL,3 NULL]]",
idxOff2KeyOff: "[-1 0 -1 -1 -1]",
accesses: "[in(Column#1, 1, 3)]",
remained: "[gt(Column#3, aaa) lt(Column#3, bbb)]",
compareFilters: "<nil>",
},
},
},
}
for _, tt := range tests {
ijCase := &indexJoinTestCase{
innerKeys: tt.innerKeys,
pushedDownConds: tt.pushedDownConds,
otherConds: tt.otherConds,
rangeMaxSize: 0,
}
require.NoError(t, err)
require.Equal(t, tt.accesses, fmt.Sprintf("%v", helper.chosenAccess))
require.Equal(t, tt.ranges, fmt.Sprintf("%v", helper.chosenRanges.Range()), "test case: ", i)
require.Equal(t, tt.idxOff2KeyOff, fmt.Sprintf("%v", helper.idxOff2KeyOff))
require.Equal(t, tt.remained, fmt.Sprintf("%v", helper.chosenRemained))
require.Equal(t, tt.compareFilters, fmt.Sprintf("%v", helper.lastColManager))
for i, res := range tt.outputs {
ijCase.ranges = res.ranges
ijCase.idxOff2KeyOff = res.idxOff2KeyOff
ijCase.accesses = res.accesses
ijCase.remained = res.remained
ijCase.compareFilters = res.compareFilters
ijHelper := testAnalyzeLookUpFilters(t, ijCtx, ijCase)
checkRangeFallbackAndReset(t, ctx, i > 0)
ijCase.rangeMaxSize = ijHelper.chosenRanges.Range().MemUsage() - 1
}
}

// test that building ranges doesn't have mem limit under rebuild mode
ijCase := &indexJoinTestCase{
innerKeys: []*expression.Column{dsSchema.Columns[0], dsSchema.Columns[2]},
pushedDownConds: "b in (1, 3) and d in (2, 4)",
otherConds: "",
rangeMaxSize: 1,
rebuildMode: true,
ranges: "[[NULL 1 NULL 2,NULL 1 NULL 2] [NULL 1 NULL 4,NULL 1 NULL 4] [NULL 3 NULL 2,NULL 3 NULL 2] [NULL 3 NULL 4,NULL 3 NULL 4]]",
}
ijHelper := testAnalyzeLookUpFilters(t, ijCtx, ijCase)
checkRangeFallbackAndReset(t, ctx, false)
require.Greater(t, ijHelper.chosenRanges.Range().MemUsage(), ijCase.rangeMaxSize)
}
77 changes: 77 additions & 0 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7585,3 +7585,80 @@ func TestExplainAnalyzeDMLCommit(t *testing.T) {
require.NoError(t, err)
tk.MustQuery("select * from t").Check(testkit.Rows())
}

func TestIndexJoinRangeFallback(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1, t2")
tk.MustExec("create table t1(a int, b int, c varchar(10), d varchar(10), index idx_a_b_c_d(a, b, c(2), d(2)))")
tk.MustExec("create table t2(e int, f int, g varchar(10), h varchar(10))")

var input []string
var output []struct {
SQL string
Plan []string
Warn []string
}
integrationSuiteData := core.GetIntegrationSuiteData()
integrationSuiteData.LoadTestCases(t, &input, &output)
for i, tt := range input {
setStmt := strings.HasPrefix(tt, "set")
testdata.OnRecord(func() {
output[i].SQL = tt
if !setStmt {
output[i].Plan = testdata.ConvertRowsToStrings(tk.MustQuery(tt).Rows())
output[i].Warn = testdata.ConvertSQLWarnToStrings(tk.Session().GetSessionVars().StmtCtx.GetWarnings())
}
})
if setStmt {
tk.MustExec(tt)
} else {
tk.MustQuery(tt).Check(testkit.Rows(output[i].Plan...))
require.Equal(t, output[i].Warn, testdata.ConvertSQLWarnToStrings(tk.Session().GetSessionVars().StmtCtx.GetWarnings()))
}
}
}

func TestPlanCacheForIndexJoinRangeFallback(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec(`set @@tidb_enable_prepared_plan_cache=1`)
tk.MustExec("set @@tidb_enable_collect_execution_info=0")
tk.MustExec("use test")
tk.MustExec("drop table if exists t1, t2")
tk.MustExec("create table t1(a int, b varchar(10), c varchar(10), index idx_a_b(a, b))")
tk.MustExec("create table t2(d int)")
tk.MustExec("set @@tidb_opt_range_max_size=1275")
// 1275 is enough for [? a,? a], [? b,? b], [? c,? c] but is not enough for [? aaaaaa,? aaaaaa], [? bbbbbb,? bbbbbb], [? cccccc,? cccccc].
rows := tk.MustQuery("explain format='brief' select /*+ inl_join(t1) */ * from t1 join t2 on t1.a = t2.d where t1.b in ('a', 'b', 'c')").Rows()
require.True(t, strings.Contains(rows[6][4].(string), "range: decided by [eq(test.t1.a, test.t2.d) in(test.t1.b, a, b, c)]"))
tk.MustQuery("show warnings").Check(testkit.Rows())
rows = tk.MustQuery("explain format='brief' select /*+ inl_join(t1) */ * from t1 join t2 on t1.a = t2.d where t1.b in ('aaaaaa', 'bbbbbb', 'cccccc');").Rows()
require.True(t, strings.Contains(rows[6][4].(string), "range: decided by [eq(test.t1.a, test.t2.d)]"))
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 Memory capacity of 1275 bytes for 'tidb_opt_range_max_size' exceeded when building ranges. Less accurate ranges such as full range are chosen"))

tk.MustExec("prepare stmt1 from 'select /*+ inl_join(t1) */ * from t1 join t2 on t1.a = t2.d where t1.b in (?, ?, ?)'")
tk.MustExec("set @a='a', @b='b', @c='c'")
tk.MustExec("execute stmt1 using @a, @b, @c")
tk.MustQuery("show warnings").Check(testkit.Rows())
tk.MustExec("set @a='aaaaaa', @b='bbbbbb', @c='cccccc'")
tk.MustExec("execute stmt1 using @a, @b, @c")
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))
tk.MustExec("execute stmt1 using @a, @b, @c")
tkProcess := tk.Session().ShowProcess()
ps := []*util.ProcessInfo{tkProcess}
tk.Session().SetSessionManager(&testkit.MockSessionManager{PS: ps})
rows = tk.MustQuery(fmt.Sprintf("explain for connection %d", tkProcess.ID)).Rows()
// We don't limit range mem usage when rebuilding index join ranges for the cached plan. So [? aaaaaa,? aaaaaa], [? bbbbbb,? bbbbbb], [? cccccc,? cccccc] can be built.
// TODO: use the right range `range: decided by [eq(test.t1.a, test.t2.d) in(test.t1.b, aaaaaa, bbbbbb, cccccc)]` after https://github.com/pingcap/tidb/issues/38269 is fixed.
require.True(t, strings.Contains(rows[6][4].(string), "range: decided by [eq(test.t1.a, test.t2.d) in(test.t1.b, a, b, c)]"))

// Test the plan with range fallback would not be put into cache.
tk.MustExec("prepare stmt2 from 'select /*+ inl_join(t1) */ * from t1 join t2 on t1.a = t2.d where t1.b in (?, ?, ?, ?, ?)'")
tk.MustExec("set @a='a', @b='b', @c='c', @d='d', @e='e'")
tk.MustExec("execute stmt2 using @a, @b, @c, @d, @e")
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 Memory capacity of 1275 bytes for 'tidb_opt_range_max_size' exceeded when building ranges. Less accurate ranges such as full range are chosen"))
tk.MustExec("execute stmt2 using @a, @b, @c, @d, @e")
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
}
Loading