Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: reset groupCount when Close is called in streamagg executor (#53874) #55701

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/executor/internal/vecgroupchecker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ go_test(
],
embed = [":vecgroupchecker"],
flaky = True,
shard_count = 3,
shard_count = 4,
deps = [
"//pkg/config",
"//pkg/expression",
Expand Down
1 change: 1 addition & 0 deletions pkg/executor/internal/vecgroupchecker/vec_group_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,7 @@ func (e *VecGroupChecker) IsExhausted() bool {
func (e *VecGroupChecker) Reset() {
if e.groupOffset != nil {
e.groupOffset = e.groupOffset[:0]
e.groupCount = 0
}
if e.sameGroup != nil {
e.sameGroup = e.sameGroup[:0]
Expand Down
10 changes: 10 additions & 0 deletions pkg/executor/internal/vecgroupchecker/vec_group_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,3 +268,13 @@ func TestVecGroupChecker(t *testing.T) {
require.Equal(t, e, 3)
require.True(t, groupChecker.IsExhausted())
}

func TestIssue53867(t *testing.T) {
checker := NewVecGroupChecker(nil, true, nil)
checker.groupOffset = make([]int, 20)
checker.nextGroupID = 10
checker.groupCount = 15
require.False(t, checker.IsExhausted())
checker.Reset()
require.True(t, checker.IsExhausted())
}
2 changes: 1 addition & 1 deletion pkg/executor/test/issuetest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ go_test(
"main_test.go",
],
flaky = True,
shard_count = 20,
shard_count = 21,
deps = [
"//pkg/autoid_service",
"//pkg/config",
Expand Down
17 changes: 17 additions & 0 deletions pkg/executor/test/issuetest/executor_issue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,3 +676,20 @@ func TestIssue52978(t *testing.T) {
tk.MustQuery("select min(truncate(cast(-26340 as double), ref_11.a)) as c3 from t as ref_11;").Check(testkit.Rows("-26340"))
tk.MustExec("drop table if exists t")
}

func TestIssue53867(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t_xf1at0 (c_weg int ,c_u text ,c_bu double ,c__icnfdo_ tinyint ,c_micv4b_95 text ,c_ngdu3 int not null ,c_curc3h double ,c_menn1dk double ,c_pv int unique ,c_u3zry tinyint ,primary key(c_ngdu3, c_pv) NONCLUSTERED) shard_row_id_bits=4 pre_split_regions=2;")
tk.MustExec("create index t_kp1_idx_1 on t_xf1at0 (c_weg, c_ngdu3, c_pv);")
tk.MustExec("insert into t_xf1at0 (c_weg, c_ngdu3, c_pv) values (0, 0, 0), (1, 1, 1), (2, 2, 2), (3, 3, 3), (4, 4, 4), (5, 5, 5), (6, 6, 6), (7, 7, 7), (8, 8, 8), (9, 9, 9), (10, 10, 10);")
tk.MustExec("create table t_bhze93f (c_x393ej_ int ,c_k3kss19 double not null ,c_q6qt9_ int not null ,c_wp7o_0sstj text ,c_gus881_9 double unique ,c_wzmb0 text ,primary key(c_q6qt9_) NONCLUSTERED) shard_row_id_bits=4 pre_split_regions=2;")
tk.MustExec("insert into t_bhze93f (c_x393ej_, c_k3kss19, c_q6qt9_, c_wp7o_0sstj, c_gus881_9, c_wzmb0) values(-1458322912, 5.32, -811171723, cast(null as char), 96.15, 'r0exs4umz5'),(cast(null as signed), 57.87, -1624364959, 't968', 35.92, 'f'),(1237193156, 81.49, -744800718, 'pfcascv16e', cast(null as double), 'pv34_'),(1955355436, 16.26, -1560468978, cast(null as char), 65537.1, 'bps');")
tk.MustExec("create table t_b0t (c_g int ,c_ci4cf5ns tinyint ,c_at double ,c_xb int ,c_uyu4fop36b double ,c_zhouc text ,c_m9g6b tinyint not null unique ,c_k7rlw47ob tinyint unique ,primary key(c_xb) CLUSTERED) pre_split_regions=2;")

// Need no panic
tk.MustQuery("select /*+ STREAM_AGG() */ (ref_4.c_k3kss19 / ref_4.c_k3kss19) as c2 from t_bhze93f as ref_4 where (EXISTS (select ref_5.c_wp7o_0sstj as c0 from t_bhze93f as ref_5 where (207007502 < (select distinct ref_6.c_weg as c0 from t_xf1at0 as ref_6 union all (select ref_7.c_xb as c0 from t_b0t as ref_7 where (-16090 != ref_4.c_x393ej_)) limit 1)) limit 1));")
}