Skip to content

Commit

Permalink
planner: prohibit StreamAgg with group keys for TiFlash (#39547)
Browse files Browse the repository at this point in the history
close #39266
  • Loading branch information
fixdb committed Dec 1, 2022
1 parent 7dedfab commit 38b0ab7
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 2 deletions.
29 changes: 29 additions & 0 deletions executor/tiflashtest/tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1207,6 +1207,35 @@ func TestAggPushDownCountStar(t *testing.T) {
tk.MustQuery("select count(*) from c, o where c.c_id=o.c_id").Check(testkit.Rows("5"))
}

func TestGroupStreamAggOnTiFlash(t *testing.T) {
store := testkit.CreateMockStore(t, withMockTiFlash(2))
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("drop table if exists foo")
tk.MustExec("create table foo(a int, b int, c int, d int, primary key(a,b,c,d))")
tk.MustExec("alter table foo set tiflash replica 1")
tk.MustExec("insert into foo values(1,2,3,1),(1,2,3,6),(1,2,3,5)," +
"(1,2,3,2),(1,2,3,4),(1,2,3,7),(1,2,3,3),(1,2,3,0)")
tb := external.GetTableByName(t, tk, "test", "foo")
err := domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
require.NoError(t, err)
tk.MustExec("set @@tidb_allow_mpp=0")
sql := "select a,b,c,count(*) from foo group by a,b,c order by a,b,c"
tk.MustQuery(sql).Check(testkit.Rows("1 2 3 8"))
rows := tk.MustQuery("explain " + sql).Rows()

for _, row := range rows {
resBuff := bytes.NewBufferString("")
fmt.Fprintf(resBuff, "%s\n", row)
res := resBuff.String()
// StreamAgg with group keys on TiFlash is not supported
if strings.Contains(res, "tiflash") {
require.NotContains(t, res, "StreamAgg")
}
}
}

func TestTiflashEmptyDynamicPruneResult(t *testing.T) {
store := testkit.CreateMockStore(t, withMockTiFlash(2))
tk := testkit.NewTestKit(t, store)
Expand Down
8 changes: 6 additions & 2 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1743,8 +1743,12 @@ func (p *PhysicalStreamAgg) attach2Task(tasks ...task) task {
t = cop.convertToRootTask(p.ctx)
attachPlan2Task(p, t)
} else {
copTaskType := cop.getStoreType()
partialAgg, finalAgg := p.newPartialAggregate(copTaskType, false)
storeType := cop.getStoreType()
// TiFlash doesn't support Stream Aggregation
if storeType == kv.TiFlash && len(p.GroupByItems) > 0 {
return invalidTask
}
partialAgg, finalAgg := p.newPartialAggregate(storeType, false)
if partialAgg != nil {
if cop.tablePlan != nil {
cop.finishIndexPlan()
Expand Down

0 comments on commit 38b0ab7

Please sign in to comment.