-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
executor: support group_concat under new aggregation evaluation framework #7032
Changes from 8 commits
701eb4c
6b3c066
437a9e5
d08fd92
91557ef
3ad8478
892709f
4665a59
68aad31
ae7b9b8
6a1ba50
df72247
04c23d7
bf314b2
4c809f8
1129a42
9690a2c
94600fb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,10 +14,13 @@ | |
package aggfuncs | ||
|
||
import ( | ||
"github.com/juju/errors" | ||
"github.com/pingcap/tidb/ast" | ||
"github.com/pingcap/tidb/expression" | ||
"github.com/pingcap/tidb/expression/aggregation" | ||
"github.com/pingcap/tidb/mysql" | ||
"github.com/pingcap/tidb/types" | ||
log "github.com/sirupsen/logrus" | ||
) | ||
|
||
// Build is used to build a specific AggFunc implementation according to the | ||
|
@@ -116,7 +119,34 @@ func buildMin(aggFuncDesc *aggregation.AggFuncDesc, ordinal int) AggFunc { | |
|
||
// buildCount builds the AggFunc implementation for function "GROUP_CONCAT". | ||
func buildGroupConcat(aggFuncDesc *aggregation.AggFuncDesc, ordinal int) AggFunc { | ||
return nil | ||
// TODO: There might be different kind of types of the args, | ||
// we should add CastAsString upon every arg after cast can be pushed down to coprocessor. | ||
// And this check can be removed at that time. | ||
for _, arg := range aggFuncDesc.Args { | ||
if arg.GetType().EvalType() != types.ETString { | ||
return nil | ||
} | ||
} | ||
base := baseAggFunc{ | ||
args: aggFuncDesc.Args[:len(aggFuncDesc.Args)-1], | ||
ordinal: ordinal, | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. remove |
||
switch aggFuncDesc.Mode { | ||
case aggregation.DedupMode: | ||
return nil | ||
default: | ||
// The last arg is promised to be a not-null string constant, so the error can be ignored. | ||
c, _ := aggFuncDesc.Args[len(aggFuncDesc.Args)-1].(*expression.Constant) | ||
sep, _, err := c.EvalString(nil, nil) | ||
// This err will never happen, we check it here for passing the errcheck. | ||
if err != nil { | ||
log.Warning("Error happened when buildGroupConcat:", errors.Trace(err).Error()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should return |
||
} | ||
if aggFuncDesc.HasDistinct { | ||
return &groupConcat4DistinctString{baseGroupConcat4String{baseAggFunc: base, sep: sep}} | ||
} | ||
return &groupConcat4String{baseGroupConcat4String{baseAggFunc: base, sep: sep}} | ||
} | ||
} | ||
|
||
// buildCount builds the AggFunc implementation for function "BIT_OR". | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,144 @@ | ||
// Copyright 2018 PingCAP, Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package aggfuncs | ||
|
||
import ( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. reorganize the import packages |
||
"bytes" | ||
"strings" | ||
|
||
"github.com/juju/errors" | ||
"github.com/pingcap/tidb/sessionctx" | ||
"github.com/pingcap/tidb/util/chunk" | ||
) | ||
|
||
type baseGroupConcat4String struct { | ||
baseAggFunc | ||
|
||
sep string | ||
} | ||
|
||
func (e *baseGroupConcat4String) AppendFinalResult2Chunk(sctx sessionctx.Context, pr PartialResult, chk *chunk.Chunk) error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. put functions belonging to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. baseGroupConcat4String only has one function. |
||
p := (*partialResult4ConcatString)(pr) | ||
if p.buffer == nil { | ||
chk.AppendNull(e.ordinal) | ||
return nil | ||
} | ||
chk.AppendString(e.ordinal, p.buffer.String()) | ||
return nil | ||
} | ||
|
||
type basePartialResult4GroupConcat struct { | ||
buffer *bytes.Buffer | ||
} | ||
|
||
type partialResult4ConcatString struct { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how about |
||
basePartialResult4GroupConcat | ||
} | ||
|
||
type groupConcat4String struct { | ||
baseGroupConcat4String | ||
} | ||
|
||
func (e *groupConcat4String) AllocPartialResult() PartialResult { | ||
return PartialResult(new(partialResult4ConcatString)) | ||
} | ||
|
||
func (e *groupConcat4String) ResetPartialResult(pr PartialResult) { | ||
p := (*partialResult4ConcatString)(pr) | ||
p.buffer = nil | ||
} | ||
|
||
func (e *groupConcat4String) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) (err error) { | ||
p := (*partialResult4ConcatString)(pr) | ||
v, isNull := "", false | ||
for _, row := range rowsInGroup { | ||
isWriteSep := false | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how about s/isWriteSep/writeSep/? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I prefer the current name.😄 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about define |
||
for i, l := 0, len(e.args); i < l; i++ { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
v, isNull, err = e.args[i].EvalString(sctx, row) | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
if isNull { | ||
continue | ||
} | ||
This comment was marked as resolved.
Sorry, something went wrong. |
||
isWriteSep = true | ||
if p.buffer == nil { | ||
p.buffer = &bytes.Buffer{} | ||
} | ||
p.buffer.WriteString(v) | ||
} | ||
if isWriteSep { | ||
This comment was marked as resolved.
Sorry, something went wrong.
This comment was marked as resolved.
Sorry, something went wrong.
This comment was marked as resolved.
Sorry, something went wrong. |
||
p.buffer.WriteString(e.sep) | ||
} | ||
} | ||
p.buffer.Truncate(p.buffer.Len() - 1) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can be removed? |
||
// TODO: if total length is greater than global var group_concat_max_len, truncate it. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. make this as an issue and put the issue number into this comment. |
||
// issue: #7034 | ||
return nil | ||
} | ||
|
||
type partialResult4ConcatDistinctString struct { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how about |
||
basePartialResult4GroupConcat | ||
valSet stringSet | ||
} | ||
|
||
type groupConcat4DistinctString struct { | ||
baseGroupConcat4String | ||
} | ||
|
||
func (e *groupConcat4DistinctString) AllocPartialResult() PartialResult { | ||
p := new(partialResult4ConcatDistinctString) | ||
p.valSet = newStringSet() | ||
return PartialResult(p) | ||
} | ||
|
||
func (e *groupConcat4DistinctString) ResetPartialResult(pr PartialResult) { | ||
p := (*partialResult4ConcatDistinctString)(pr) | ||
p.buffer, p.valSet = nil, newStringSet() | ||
} | ||
|
||
func (e *groupConcat4DistinctString) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) (err error) { | ||
p := (*partialResult4ConcatDistinctString)(pr) | ||
v, isNull, valsBuf, joinedVals := "", false, make([]string, len(e.args)), "" | ||
for _, row := range rowsInGroup { | ||
for i, l := 0, len(e.args); i < l; i++ { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto |
||
v, isNull, err = e.args[i].EvalString(sctx, row) | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
if isNull { | ||
continue | ||
} | ||
valsBuf[i] = v | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This valsBuf should be maintained, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we make |
||
} | ||
joinedVals = strings.Join(valsBuf, "") | ||
if p.valSet.exist(joinedVals) { | ||
continue | ||
} | ||
p.valSet.insert(joinedVals) | ||
// write separator | ||
if p.buffer == nil { | ||
p.buffer = &bytes.Buffer{} | ||
} else { | ||
p.buffer.WriteString(e.sep) | ||
} | ||
// write values | ||
for _, s := range valsBuf { | ||
p.buffer.WriteString(s) | ||
} | ||
} | ||
// TODO: if total length is greater than global var group_concat_max_len, truncate it. | ||
// issue: #7034 | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
group_concat
always executes on the string inputs, I think we can remove the4String
suffix. How aboutgroupConcat
andgroupConcatDistinct
?