Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/pingcap/tidb into add_adv…
Browse files Browse the repository at this point in the history
…ertise-address_to_config
  • Loading branch information
crazycs520 committed Jul 17, 2018
2 parents f9a1cd7 + b29d52b commit 726ebe2
Show file tree
Hide file tree
Showing 7 changed files with 218 additions and 10 deletions.
2 changes: 1 addition & 1 deletion config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# TiDB server host.
host = "0.0.0.0"

# tidb server advertise IP
# tidb server advertise IP.
advertise-ip = ""

# TiDB server port.
Expand Down
3 changes: 3 additions & 0 deletions executor/aggfuncs/aggfuncs.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ var (
_ AggFunc = (*avgOriginal4DistinctFloat64)(nil)

// All the AggFunc implementations for "GROUP_CONCAT" are listed here.
_ AggFunc = (*groupConcatDistinct)(nil)
_ AggFunc = (*groupConcat)(nil)

// All the AggFunc implementations for "BIT_OR" are listed here.
_ AggFunc = (*bitOrUint64)(nil)

Expand Down
32 changes: 31 additions & 1 deletion executor/aggfuncs/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
package aggfuncs

import (
"fmt"
"github.com/juju/errors"
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -187,7 +190,34 @@ func buildMaxMin(aggFuncDesc *aggregation.AggFuncDesc, ordinal int, isMax bool)

// buildGroupConcat builds the AggFunc implementation for function "GROUP_CONCAT".
func buildGroupConcat(aggFuncDesc *aggregation.AggFuncDesc, ordinal int) AggFunc {
return nil
// TODO: There might be different kind of types of the args,
// we should add CastAsString upon every arg after cast can be pushed down to coprocessor.
// And this check can be removed at that time.
for _, arg := range aggFuncDesc.Args {
if arg.GetType().EvalType() != types.ETString {
return nil
}
}
switch aggFuncDesc.Mode {
case aggregation.DedupMode:
return nil
default:
base := baseAggFunc{
args: aggFuncDesc.Args[:len(aggFuncDesc.Args)-1],
ordinal: ordinal,
}
// The last arg is promised to be a not-null string constant, so the error can be ignored.
c, _ := aggFuncDesc.Args[len(aggFuncDesc.Args)-1].(*expression.Constant)
sep, _, err := c.EvalString(nil, nil)
// This err should never happen.
if err != nil {
panic(fmt.Sprintf("Error happened when buildGroupConcat: %s", errors.Trace(err).Error()))
}
if aggFuncDesc.HasDistinct {
return &groupConcatDistinct{baseGroupConcat4String{baseAggFunc: base, sep: sep}}
}
return &groupConcat{baseGroupConcat4String{baseAggFunc: base, sep: sep}}
}
}

// buildBitOr builds the AggFunc implementation for function "BIT_OR".
Expand Down
144 changes: 144 additions & 0 deletions executor/aggfuncs/func_group_concat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package aggfuncs

import (
"bytes"

"github.com/juju/errors"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/chunk"
)

type baseGroupConcat4String struct {
baseAggFunc

sep string
}

func (e *baseGroupConcat4String) AppendFinalResult2Chunk(sctx sessionctx.Context, pr PartialResult, chk *chunk.Chunk) error {
p := (*partialResult4GroupConcat)(pr)
if p.buffer == nil {
chk.AppendNull(e.ordinal)
return nil
}
chk.AppendString(e.ordinal, p.buffer.String())
return nil
}

type basePartialResult4GroupConcat struct {
buffer *bytes.Buffer
}

type partialResult4GroupConcat struct {
basePartialResult4GroupConcat
}

type groupConcat struct {
baseGroupConcat4String
}

func (e *groupConcat) AllocPartialResult() PartialResult {
return PartialResult(new(partialResult4GroupConcat))
}

func (e *groupConcat) ResetPartialResult(pr PartialResult) {
p := (*partialResult4GroupConcat)(pr)
p.buffer = nil
}

func (e *groupConcat) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) (err error) {
p := (*partialResult4GroupConcat)(pr)
v, isNull, isWriteSep := "", false, false
for _, row := range rowsInGroup {
isWriteSep = false
for _, arg := range e.args {
v, isNull, err = arg.EvalString(sctx, row)
if err != nil {
return errors.Trace(err)
}
if isNull {
continue
}
isWriteSep = true
if p.buffer == nil {
p.buffer = &bytes.Buffer{}
}
p.buffer.WriteString(v)
}
if isWriteSep {
p.buffer.WriteString(e.sep)
}
}
p.buffer.Truncate(p.buffer.Len() - len(e.sep))
// TODO: if total length is greater than global var group_concat_max_len, truncate it.
// issue: #7034
return nil
}

type partialResult4GroupConcatDistinct struct {
basePartialResult4GroupConcat
valsBuf *bytes.Buffer
valSet stringSet
}

type groupConcatDistinct struct {
baseGroupConcat4String
}

func (e *groupConcatDistinct) AllocPartialResult() PartialResult {
p := new(partialResult4GroupConcatDistinct)
p.valsBuf = &bytes.Buffer{}
p.valSet = newStringSet()
return PartialResult(p)
}

func (e *groupConcatDistinct) ResetPartialResult(pr PartialResult) {
p := (*partialResult4GroupConcatDistinct)(pr)
p.buffer, p.valSet = nil, newStringSet()
}

func (e *groupConcatDistinct) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) (err error) {
p := (*partialResult4GroupConcatDistinct)(pr)
v, isNull := "", false
for _, row := range rowsInGroup {
p.valsBuf.Reset()
for _, arg := range e.args {
v, isNull, err = arg.EvalString(sctx, row)
if err != nil {
return errors.Trace(err)
}
if isNull {
continue
}
p.valsBuf.WriteString(v)
}
joinedVals := p.valsBuf.String()
if p.valSet.exist(joinedVals) {
continue
}
p.valSet.insert(joinedVals)
// write separator
if p.buffer == nil {
p.buffer = &bytes.Buffer{}
} else {
p.buffer.WriteString(e.sep)
}
// write values
p.buffer.WriteString(joinedVals)
}
// TODO: if total length is greater than global var group_concat_max_len, truncate it.
// issue: #7034
return nil
}
3 changes: 3 additions & 0 deletions executor/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,9 @@ func (s *testSuite) TestGroupConcatAggr(c *C) {

result = tk.MustQuery("select id, group_concat(name SEPARATOR '') from test group by id order by id")
result.Check(testkit.Rows("1 102030", "2 20", "3 200500"))

result = tk.MustQuery("select id, group_concat(name SEPARATOR '123') from test group by id order by id")
result.Check(testkit.Rows("1 101232012330", "2 20", "3 200123500"))
}

func (s *testSuite) TestSelectDistinct(c *C) {
Expand Down
29 changes: 21 additions & 8 deletions tablecodec/tablecodec.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@ var (
)

const (
idLen = 8
prefixLen = 1 + idLen /*tableID*/ + 2
recordRowKeyLen = prefixLen + idLen /*handle*/
idLen = 8
prefixLen = 1 + idLen /*tableID*/ + 2
recordRowKeyLen = prefixLen + idLen /*handle*/
tablePrefixLength = 1
recordPrefixSepLength = 2
)

// TableSplitKeyLen is the length of key 't{table_id}' which is used for table split.
Expand Down Expand Up @@ -84,25 +86,36 @@ func EncodeRecordKey(recordPrefix kv.Key, h int64) kv.Key {
return buf
}

func hasTablePrefix(key kv.Key) bool {
return key[0] == tablePrefix[0]
}

func hasRecordPrefixSep(key kv.Key) bool {
return key[0] == recordPrefixSep[0] && key[1] == recordPrefixSep[1]
}

// DecodeRecordKey decodes the key and gets the tableID, handle.
func DecodeRecordKey(key kv.Key) (tableID int64, handle int64, err error) {
if len(key) <= prefixLen {
return 0, 0, errInvalidRecordKey.Gen("invalid record key - %q", key)
}

k := key
if !key.HasPrefix(tablePrefix) {
if !hasTablePrefix(key) {
return 0, 0, errInvalidRecordKey.Gen("invalid record key - %q", k)
}

key = key[len(tablePrefix):]
key = key[tablePrefixLength:]
key, tableID, err = codec.DecodeInt(key)
if err != nil {
return 0, 0, errors.Trace(err)
}

if !key.HasPrefix(recordPrefixSep) {
if !hasRecordPrefixSep(key) {
return 0, 0, errInvalidRecordKey.Gen("invalid record key - %q", k)
}

key = key[len(recordPrefixSep):]

key = key[recordPrefixSepLength:]
key, handle, err = codec.DecodeInt(key)
if err != nil {
return 0, 0, errors.Trace(err)
Expand Down
15 changes: 15 additions & 0 deletions tablecodec/tablecodec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -359,3 +360,17 @@ func (s *testTableCodecSuite) TestDecodeIndexKey(c *C) {
c.Assert(decodeIndexID, Equals, indexID)
c.Assert(decodeValues, DeepEquals, valueStrs)
}

func BenchmarkHasTablePrefix(b *testing.B) {
k := kv.Key("foobar")
for i := 0; i < b.N; i++ {
hasTablePrefix(k)
}
}

func BenchmarkHasTablePrefixBuiltin(b *testing.B) {
k := kv.Key("foobar")
for i := 0; i < b.N; i++ {
k.HasPrefix(tablePrefix)
}
}

0 comments on commit 726ebe2

Please sign in to comment.