Skip to content

Commit

Permalink
executor: support group_concat under new aggregation evaluation frame…
Browse files Browse the repository at this point in the history
…work (#7032)
  • Loading branch information
XuHuaiyu authored and zz-jason committed Jul 17, 2018
1 parent 533c777 commit b29d52b
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 1 deletion.
3 changes: 3 additions & 0 deletions executor/aggfuncs/aggfuncs.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ var (
_ AggFunc = (*avgOriginal4DistinctFloat64)(nil)

// All the AggFunc implementations for "GROUP_CONCAT" are listed here.
_ AggFunc = (*groupConcatDistinct)(nil)
_ AggFunc = (*groupConcat)(nil)

// All the AggFunc implementations for "BIT_OR" are listed here.
_ AggFunc = (*bitOrUint64)(nil)

Expand Down
32 changes: 31 additions & 1 deletion executor/aggfuncs/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
package aggfuncs

import (
"fmt"
"github.com/juju/errors"
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -187,7 +190,34 @@ func buildMaxMin(aggFuncDesc *aggregation.AggFuncDesc, ordinal int, isMax bool)

// buildGroupConcat builds the AggFunc implementation for function "GROUP_CONCAT".
func buildGroupConcat(aggFuncDesc *aggregation.AggFuncDesc, ordinal int) AggFunc {
return nil
// TODO: There might be different kind of types of the args,
// we should add CastAsString upon every arg after cast can be pushed down to coprocessor.
// And this check can be removed at that time.
for _, arg := range aggFuncDesc.Args {
if arg.GetType().EvalType() != types.ETString {
return nil
}
}
switch aggFuncDesc.Mode {
case aggregation.DedupMode:
return nil
default:
base := baseAggFunc{
args: aggFuncDesc.Args[:len(aggFuncDesc.Args)-1],
ordinal: ordinal,
}
// The last arg is promised to be a not-null string constant, so the error can be ignored.
c, _ := aggFuncDesc.Args[len(aggFuncDesc.Args)-1].(*expression.Constant)
sep, _, err := c.EvalString(nil, nil)
// This err should never happen.
if err != nil {
panic(fmt.Sprintf("Error happened when buildGroupConcat: %s", errors.Trace(err).Error()))
}
if aggFuncDesc.HasDistinct {
return &groupConcatDistinct{baseGroupConcat4String{baseAggFunc: base, sep: sep}}
}
return &groupConcat{baseGroupConcat4String{baseAggFunc: base, sep: sep}}
}
}

// buildBitOr builds the AggFunc implementation for function "BIT_OR".
Expand Down
144 changes: 144 additions & 0 deletions executor/aggfuncs/func_group_concat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package aggfuncs

import (
"bytes"

"github.com/juju/errors"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/chunk"
)

type baseGroupConcat4String struct {
baseAggFunc

sep string
}

func (e *baseGroupConcat4String) AppendFinalResult2Chunk(sctx sessionctx.Context, pr PartialResult, chk *chunk.Chunk) error {
p := (*partialResult4GroupConcat)(pr)
if p.buffer == nil {
chk.AppendNull(e.ordinal)
return nil
}
chk.AppendString(e.ordinal, p.buffer.String())
return nil
}

type basePartialResult4GroupConcat struct {
buffer *bytes.Buffer
}

type partialResult4GroupConcat struct {
basePartialResult4GroupConcat
}

type groupConcat struct {
baseGroupConcat4String
}

func (e *groupConcat) AllocPartialResult() PartialResult {
return PartialResult(new(partialResult4GroupConcat))
}

func (e *groupConcat) ResetPartialResult(pr PartialResult) {
p := (*partialResult4GroupConcat)(pr)
p.buffer = nil
}

func (e *groupConcat) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) (err error) {
p := (*partialResult4GroupConcat)(pr)
v, isNull, isWriteSep := "", false, false
for _, row := range rowsInGroup {
isWriteSep = false
for _, arg := range e.args {
v, isNull, err = arg.EvalString(sctx, row)
if err != nil {
return errors.Trace(err)
}
if isNull {
continue
}
isWriteSep = true
if p.buffer == nil {
p.buffer = &bytes.Buffer{}
}
p.buffer.WriteString(v)
}
if isWriteSep {
p.buffer.WriteString(e.sep)
}
}
p.buffer.Truncate(p.buffer.Len() - len(e.sep))
// TODO: if total length is greater than global var group_concat_max_len, truncate it.
// issue: #7034
return nil
}

type partialResult4GroupConcatDistinct struct {
basePartialResult4GroupConcat
valsBuf *bytes.Buffer
valSet stringSet
}

type groupConcatDistinct struct {
baseGroupConcat4String
}

func (e *groupConcatDistinct) AllocPartialResult() PartialResult {
p := new(partialResult4GroupConcatDistinct)
p.valsBuf = &bytes.Buffer{}
p.valSet = newStringSet()
return PartialResult(p)
}

func (e *groupConcatDistinct) ResetPartialResult(pr PartialResult) {
p := (*partialResult4GroupConcatDistinct)(pr)
p.buffer, p.valSet = nil, newStringSet()
}

func (e *groupConcatDistinct) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) (err error) {
p := (*partialResult4GroupConcatDistinct)(pr)
v, isNull := "", false
for _, row := range rowsInGroup {
p.valsBuf.Reset()
for _, arg := range e.args {
v, isNull, err = arg.EvalString(sctx, row)
if err != nil {
return errors.Trace(err)
}
if isNull {
continue
}
p.valsBuf.WriteString(v)
}
joinedVals := p.valsBuf.String()
if p.valSet.exist(joinedVals) {
continue
}
p.valSet.insert(joinedVals)
// write separator
if p.buffer == nil {
p.buffer = &bytes.Buffer{}
} else {
p.buffer.WriteString(e.sep)
}
// write values
p.buffer.WriteString(joinedVals)
}
// TODO: if total length is greater than global var group_concat_max_len, truncate it.
// issue: #7034
return nil
}
3 changes: 3 additions & 0 deletions executor/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,9 @@ func (s *testSuite) TestGroupConcatAggr(c *C) {

result = tk.MustQuery("select id, group_concat(name SEPARATOR '') from test group by id order by id")
result.Check(testkit.Rows("1 102030", "2 20", "3 200500"))

result = tk.MustQuery("select id, group_concat(name SEPARATOR '123') from test group by id order by id")
result.Check(testkit.Rows("1 101232012330", "2 20", "3 200123500"))
}

func (s *testSuite) TestSelectDistinct(c *C) {
Expand Down

0 comments on commit b29d52b

Please sign in to comment.