Skip to content

Commit 83f279d

Browse files
AilinKidti-chi-bot
authored andcommitted
This is an automated cherry-pick of pingcap#49192
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
1 parent 6ed9445 commit 83f279d

File tree

4 files changed

+263
-3
lines changed

4 files changed

+263
-3
lines changed

executor/cte.go

+21
Original file line numberDiff line numberDiff line change
@@ -105,11 +105,32 @@ func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
105105

106106
// Close implements the Executor interface.
107107
func (e *CTEExec) Close() (err error) {
108+
<<<<<<< HEAD:executor/cte.go
108109
e.producer.resTbl.Lock()
109110
if !e.producer.closed {
110111
err = e.producer.closeProducer()
111112
}
112113
e.producer.resTbl.Unlock()
114+
=======
115+
func() {
116+
e.producer.resTbl.Lock()
117+
defer e.producer.resTbl.Unlock()
118+
if !e.producer.closed {
119+
failpoint.Inject("mock_cte_exec_panic_avoid_deadlock", func(v failpoint.Value) {
120+
ok := v.(bool)
121+
if ok {
122+
// mock an oom panic, returning ErrMemoryExceedForQuery for error identification in recovery work.
123+
panic(exeerrors.ErrMemoryExceedForQuery)
124+
}
125+
})
126+
// closeProducer() only close seedExec and recursiveExec, will not touch resTbl.
127+
// It means you can still read resTbl after call closeProducer().
128+
// You can even call all three functions(openProducer/produce/closeProducer) in CTEExec.Next().
129+
// Separating these three function calls is only to follow the abstraction of the volcano model.
130+
err = e.producer.closeProducer()
131+
}
132+
}()
133+
>>>>>>> 0c7659c1907 (executor: fix deadlock in dml statement with cte when oom panic action was triggered (#49192)):pkg/executor/cte.go
113134
if err != nil {
114135
return err
115136
}

pkg/executor/cte_test.go

+198
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
// Copyright 2021 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package executor_test
16+
17+
import (
18+
"fmt"
19+
"math/rand"
20+
"slices"
21+
"testing"
22+
23+
"github.com/pingcap/failpoint"
24+
"github.com/pingcap/tidb/pkg/parser/terror"
25+
"github.com/pingcap/tidb/pkg/testkit"
26+
"github.com/pingcap/tidb/pkg/types"
27+
"github.com/stretchr/testify/require"
28+
)
29+
30+
func TestCTEIssue49096(t *testing.T) {
31+
store := testkit.CreateMockStore(t)
32+
tk := testkit.NewTestKit(t, store)
33+
34+
tk.MustExec("use test;")
35+
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/mock_cte_exec_panic_avoid_deadlock", "return(true)"))
36+
defer func() {
37+
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/executor/mock_cte_exec_panic_avoid_deadlock"))
38+
}()
39+
insertStr := "insert into t1 values(0)"
40+
rowNum := 10
41+
vals := make([]int, rowNum)
42+
vals[0] = 0
43+
for i := 1; i < rowNum; i++ {
44+
v := rand.Intn(100)
45+
vals[i] = v
46+
insertStr += fmt.Sprintf(", (%d)", v)
47+
}
48+
tk.MustExec("drop table if exists t1, t2;")
49+
tk.MustExec("create table t1(c1 int);")
50+
tk.MustExec("create table t2(c1 int);")
51+
tk.MustExec(insertStr)
52+
// should be insert statement, otherwise it couldn't step int resetCTEStorageMap in handleNoDelay func.
53+
sql := "insert into t2 with cte1 as ( " +
54+
"select c1 from t1) " +
55+
"select c1 from cte1 natural join (select * from cte1 where c1 > 0) cte2 order by c1;"
56+
err := tk.ExecToErr(sql)
57+
require.NotNil(t, err)
58+
require.Equal(t, "[executor:8175]Your query has been cancelled due to exceeding the allowed memory limit for a single SQL query. Please try narrowing your query scope or increase the tidb_mem_quota_query limit and try again.[conn=%d]", err.Error())
59+
}
60+
61+
func TestSpillToDisk(t *testing.T) {
62+
store := testkit.CreateMockStore(t)
63+
64+
tk := testkit.NewTestKit(t, store)
65+
tk.MustExec("SET GLOBAL tidb_enable_tmp_storage_on_oom = 1")
66+
defer tk.MustExec("SET GLOBAL tidb_enable_tmp_storage_on_oom = 0")
67+
tk.MustExec("use test;")
68+
69+
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/testCTEStorageSpill", "return(true)"))
70+
defer func() {
71+
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/executor/testCTEStorageSpill"))
72+
tk.MustExec("set tidb_mem_quota_query = 1073741824;")
73+
}()
74+
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/sortexec/testSortedRowContainerSpill", "return(true)"))
75+
defer func() {
76+
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/executor/sortexec/testSortedRowContainerSpill"))
77+
}()
78+
79+
// Use duplicated rows to test UNION DISTINCT.
80+
tk.MustExec("set tidb_mem_quota_query = 1073741824;")
81+
insertStr := "insert into t1 values(0)"
82+
rowNum := 1000
83+
vals := make([]int, rowNum)
84+
vals[0] = 0
85+
for i := 1; i < rowNum; i++ {
86+
v := rand.Intn(100)
87+
vals[i] = v
88+
insertStr += fmt.Sprintf(", (%d)", v)
89+
}
90+
tk.MustExec("drop table if exists t1;")
91+
tk.MustExec("create table t1(c1 int);")
92+
tk.MustExec(insertStr)
93+
tk.MustExec("set tidb_mem_quota_query = 40000;")
94+
tk.MustExec("set cte_max_recursion_depth = 500000;")
95+
sql := fmt.Sprintf("with recursive cte1 as ( "+
96+
"select c1 from t1 "+
97+
"union "+
98+
"select c1 + 1 c1 from cte1 where c1 < %d) "+
99+
"select c1 from cte1 order by c1;", rowNum)
100+
rows := tk.MustQuery(sql)
101+
102+
memTracker := tk.Session().GetSessionVars().StmtCtx.MemTracker
103+
diskTracker := tk.Session().GetSessionVars().StmtCtx.DiskTracker
104+
require.Greater(t, memTracker.MaxConsumed(), int64(0))
105+
require.Greater(t, diskTracker.MaxConsumed(), int64(0))
106+
107+
slices.Sort(vals)
108+
resRows := make([]string, 0, rowNum)
109+
for i := vals[0]; i <= rowNum; i++ {
110+
resRows = append(resRows, fmt.Sprintf("%d", i))
111+
}
112+
rows.Check(testkit.Rows(resRows...))
113+
}
114+
115+
func TestCTEExecError(t *testing.T) {
116+
store := testkit.CreateMockStore(t)
117+
118+
tk := testkit.NewTestKit(t, store)
119+
tk.MustExec("use test;")
120+
tk.MustExec("drop table if exists src;")
121+
tk.MustExec("create table src(first int, second int);")
122+
123+
insertStr := fmt.Sprintf("insert into src values (%d, %d)", rand.Intn(1000), rand.Intn(1000))
124+
for i := 0; i < 1000; i++ {
125+
insertStr += fmt.Sprintf(",(%d, %d)", rand.Intn(1000), rand.Intn(1000))
126+
}
127+
insertStr += ";"
128+
tk.MustExec(insertStr)
129+
130+
// Increase projection concurrency and decrease chunk size
131+
// to increase the probability of reproducing the problem.
132+
tk.MustExec("set tidb_max_chunk_size = 32")
133+
tk.MustExec("set tidb_projection_concurrency = 20")
134+
for i := 0; i < 10; i++ {
135+
err := tk.QueryToErr("with recursive cte(iter, first, second, result) as " +
136+
"(select 1, first, second, first+second from src " +
137+
" union all " +
138+
"select iter+1, second, result, second+result from cte where iter < 80 )" +
139+
"select * from cte")
140+
require.True(t, terror.ErrorEqual(err, types.ErrOverflow))
141+
}
142+
}
143+
144+
func TestCTEPanic(t *testing.T) {
145+
store := testkit.CreateMockStore(t)
146+
tk := testkit.NewTestKit(t, store)
147+
tk.MustExec("use test;")
148+
tk.MustExec("create table t1(c1 int)")
149+
tk.MustExec("insert into t1 values(1), (2), (3)")
150+
151+
fpPathPrefix := "github.com/pingcap/tidb/pkg/executor/"
152+
fp := "testCTESeedPanic"
153+
require.NoError(t, failpoint.Enable(fpPathPrefix+fp, fmt.Sprintf(`panic("%s")`, fp)))
154+
err := tk.QueryToErr("with recursive cte1 as (select c1 from t1 union all select c1 + 1 from cte1 where c1 < 5) select t_alias_1.c1 from cte1 as t_alias_1 inner join cte1 as t_alias_2 on t_alias_1.c1 = t_alias_2.c1 order by c1")
155+
require.Contains(t, err.Error(), fp)
156+
require.NoError(t, failpoint.Disable(fpPathPrefix+fp))
157+
158+
fp = "testCTERecursivePanic"
159+
require.NoError(t, failpoint.Enable(fpPathPrefix+fp, fmt.Sprintf(`panic("%s")`, fp)))
160+
err = tk.QueryToErr("with recursive cte1 as (select c1 from t1 union all select c1 + 1 from cte1 where c1 < 5) select t_alias_1.c1 from cte1 as t_alias_1 inner join cte1 as t_alias_2 on t_alias_1.c1 = t_alias_2.c1 order by c1")
161+
require.Contains(t, err.Error(), fp)
162+
require.NoError(t, failpoint.Disable(fpPathPrefix+fp))
163+
}
164+
165+
func TestCTEDelSpillFile(t *testing.T) {
166+
store := testkit.CreateMockStore(t)
167+
tk := testkit.NewTestKit(t, store)
168+
tk.MustExec("use test;")
169+
tk.MustExec("drop table if exists t1, t2;")
170+
tk.MustExec("create table t1(c1 int, c2 int);")
171+
tk.MustExec("create table t2(c1 int);")
172+
tk.MustExec("set @@cte_max_recursion_depth = 1000000;")
173+
tk.MustExec("set global tidb_mem_oom_action = 'log';")
174+
tk.MustExec("set @@tidb_mem_quota_query = 100;")
175+
tk.MustExec("insert into t2 values(1);")
176+
tk.MustExec("insert into t1 (c1, c2) with recursive cte1 as (select c1 from t2 union select cte1.c1 + 1 from cte1 where cte1.c1 < 100000) select cte1.c1, cte1.c1+1 from cte1;")
177+
require.Nil(t, tk.Session().GetSessionVars().StmtCtx.CTEStorageMap)
178+
}
179+
180+
func TestCTEShareCorColumn(t *testing.T) {
181+
store := testkit.CreateMockStore(t)
182+
tk := testkit.NewTestKit(t, store)
183+
tk.MustExec("use test;")
184+
tk.MustExec("drop table if exists t1, t2;")
185+
tk.MustExec("create table t1(c1 int, c2 varchar(100));")
186+
tk.MustExec("insert into t1 values(1, '2020-10-10');")
187+
tk.MustExec("create table t2(c1 int, c2 date);")
188+
tk.MustExec("insert into t2 values(1, '2020-10-10');")
189+
for i := 0; i < 100; i++ {
190+
tk.MustQuery("with cte1 as (select t1.c1, (select t2.c2 from t2 where t2.c2 = str_to_date(t1.c2, '%Y-%m-%d')) from t1 inner join t2 on t1.c1 = t2.c1) select /*+ hash_join_build(alias1) */ * from cte1 alias1 inner join cte1 alias2 on alias1.c1 = alias2.c1;").Check(testkit.Rows("1 2020-10-10 1 2020-10-10"))
191+
tk.MustQuery("with cte1 as (select t1.c1, (select t2.c2 from t2 where t2.c2 = str_to_date(t1.c2, '%Y-%m-%d')) from t1 inner join t2 on t1.c1 = t2.c1) select /*+ hash_join_build(alias2) */ * from cte1 alias1 inner join cte1 alias2 on alias1.c1 = alias2.c1;").Check(testkit.Rows("1 2020-10-10 1 2020-10-10"))
192+
}
193+
194+
tk.MustExec("drop table if exists t1;")
195+
tk.MustExec("create table t1(a int);")
196+
tk.MustExec("insert into t1 values(1), (2);")
197+
tk.MustQuery("SELECT * FROM t1 dt WHERE EXISTS( WITH RECURSIVE qn AS (SELECT a AS b UNION ALL SELECT b+1 FROM qn WHERE b=0 or b = 1) SELECT * FROM qn dtqn1 where exists (select /*+ NO_DECORRELATE() */ b from qn where dtqn1.b+1));").Check(testkit.Rows("1", "2"))
198+
}

pkg/util/cteutil/BUILD.bazel

+35
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
2+
3+
go_library(
4+
name = "cteutil",
5+
srcs = ["storage.go"],
6+
importpath = "github.com/pingcap/tidb/pkg/util/cteutil",
7+
visibility = ["//visibility:public"],
8+
deps = [
9+
"//pkg/types",
10+
"//pkg/util/chunk",
11+
"//pkg/util/disk",
12+
"//pkg/util/memory",
13+
"//pkg/util/syncutil",
14+
"@com_github_pingcap_errors//:errors",
15+
],
16+
)
17+
18+
go_test(
19+
name = "cteutil_test",
20+
timeout = "short",
21+
srcs = [
22+
"main_test.go",
23+
"storage_test.go",
24+
],
25+
embed = [":cteutil"],
26+
flaky = True,
27+
deps = [
28+
"//pkg/parser/mysql",
29+
"//pkg/testkit/testsetup",
30+
"//pkg/types",
31+
"//pkg/util/chunk",
32+
"@com_github_stretchr_testify//require",
33+
"@org_uber_go_goleak//:goleak",
34+
],
35+
)

util/cteutil/storage.go

+9-3
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,19 @@
1515
package cteutil
1616

1717
import (
18-
"sync"
19-
2018
"github.com/pingcap/errors"
19+
<<<<<<< HEAD:util/cteutil/storage.go
2120
"github.com/pingcap/tidb/types"
2221
"github.com/pingcap/tidb/util/chunk"
2322
"github.com/pingcap/tidb/util/disk"
2423
"github.com/pingcap/tidb/util/memory"
24+
=======
25+
"github.com/pingcap/tidb/pkg/types"
26+
"github.com/pingcap/tidb/pkg/util/chunk"
27+
"github.com/pingcap/tidb/pkg/util/disk"
28+
"github.com/pingcap/tidb/pkg/util/memory"
29+
"github.com/pingcap/tidb/pkg/util/syncutil"
30+
>>>>>>> 0c7659c1907 (executor: fix deadlock in dml statement with cte when oom panic action was triggered (#49192)):pkg/util/cteutil/storage.go
2531
)
2632

2733
var _ Storage = &StorageRC{}
@@ -99,7 +105,7 @@ type StorageRC struct {
99105
refCnt int
100106
chkSize int
101107
iter int
102-
mu sync.Mutex
108+
mu syncutil.Mutex
103109
done bool
104110
}
105111

0 commit comments

Comments
 (0)