-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
table: use evalBuffer to improve performance of locatePartition #18818
Merged
Merged
Changes from all commits
Commits
Show all changes
16 commits
Select commit
Hold shift + click to select a range
56e33bd
optimize hash and range partition insert
47b80c4
add eval buffer for partition
5d76b54
fix make dev
fb00659
go fmt
64e8e7a
format import
96caa21
fix ci test
b7cd93c
use pool to avoid race
3777069
use pointer
8655243
add some test
9ffbd23
fix range partition bug
bd3bff8
add multi table test case
3911ebf
fix test case fail
24356ea
fix mysql test case
38c7f00
fix mysql test case
56733c0
add addition column for handle
54b7b8d
Merge branch 'master' into acc_partition_insert
ti-srebot File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,11 +21,13 @@ import ( | |
"sort" | ||
"strconv" | ||
"strings" | ||
"sync" | ||
|
||
"github.com/pingcap/errors" | ||
"github.com/pingcap/parser" | ||
"github.com/pingcap/parser/ast" | ||
"github.com/pingcap/parser/model" | ||
"github.com/pingcap/parser/mysql" | ||
"github.com/pingcap/tidb/expression" | ||
"github.com/pingcap/tidb/kv" | ||
"github.com/pingcap/tidb/sessionctx" | ||
|
@@ -65,8 +67,10 @@ func (p *partition) GetPhysicalID() int64 { | |
// partitionedTable is a table, it contains many Partitions. | ||
type partitionedTable struct { | ||
TableCommon | ||
partitionExpr *PartitionExpr | ||
partitions map[int64]*partition | ||
partitionExpr *PartitionExpr | ||
partitions map[int64]*partition | ||
evalBufferTypes []*types.FieldType | ||
evalBufferPool sync.Pool | ||
} | ||
|
||
func newPartitionedTable(tbl *TableCommon, tblInfo *model.TableInfo) (table.Table, error) { | ||
|
@@ -76,7 +80,12 @@ func newPartitionedTable(tbl *TableCommon, tblInfo *model.TableInfo) (table.Tabl | |
return nil, errors.Trace(err) | ||
} | ||
ret.partitionExpr = partitionExpr | ||
|
||
initEvalBufferType(ret) | ||
ret.evalBufferPool = sync.Pool{ | ||
New: func() interface{} { | ||
return initEvalBuffer(ret) | ||
}, | ||
} | ||
if err := initTableIndices(&ret.TableCommon); err != nil { | ||
return nil, errors.Trace(err) | ||
} | ||
|
@@ -125,6 +134,28 @@ type PartitionExpr struct { | |
*ForRangeColumnsPruning | ||
} | ||
|
||
func initEvalBufferType(t *partitionedTable) { | ||
hasExtraHandle := false | ||
numCols := len(t.Cols()) | ||
if !t.Meta().PKIsHandle { | ||
hasExtraHandle = true | ||
numCols++ | ||
} | ||
t.evalBufferTypes = make([]*types.FieldType, numCols) | ||
for i, col := range t.Cols() { | ||
t.evalBufferTypes[i] = &col.FieldType | ||
} | ||
|
||
if hasExtraHandle { | ||
t.evalBufferTypes[len(t.evalBufferTypes)-1] = types.NewFieldType(mysql.TypeLonglong) | ||
} | ||
} | ||
|
||
func initEvalBuffer(t *partitionedTable) *chunk.MutRow { | ||
evalBuffer := chunk.MutRowFromTypes(t.evalBufferTypes) | ||
return &evalBuffer | ||
} | ||
|
||
// ForRangeColumnsPruning is used for range partition pruning. | ||
type ForRangeColumnsPruning struct { | ||
LessThan []expression.Expression | ||
|
@@ -237,9 +268,9 @@ func generateRangePartitionExpr(ctx sessionctx.Context, pi *model.PartitionInfo, | |
// The caller should assure partition info is not nil. | ||
locateExprs := make([]expression.Expression, 0, len(pi.Definitions)) | ||
var buf bytes.Buffer | ||
p := parser.New() | ||
schema := expression.NewSchema(columns...) | ||
partStr := rangePartitionString(pi) | ||
p := parser.New() | ||
for i := 0; i < len(pi.Definitions); i++ { | ||
if strings.EqualFold(pi.Definitions[i].LessThan[0], "MAXVALUE") { | ||
// Expr less than maxvalue is always true. | ||
|
@@ -263,10 +294,15 @@ func generateRangePartitionExpr(ctx sessionctx.Context, pi *model.PartitionInfo, | |
|
||
switch len(pi.Columns) { | ||
case 0: | ||
exprs, err := parseSimpleExprWithNames(p, ctx, pi.Expr, schema, names) | ||
if err != nil { | ||
return nil, err | ||
} | ||
tmp, err := dataForRangePruning(ctx, pi) | ||
if err != nil { | ||
return nil, errors.Trace(err) | ||
} | ||
ret.Expr = exprs | ||
ret.ForRangePruning = tmp | ||
case 1: | ||
tmp, err := dataForRangeColumnsPruning(ctx, pi, schema, names, p) | ||
|
@@ -318,7 +354,11 @@ func (t *partitionedTable) locatePartition(ctx sessionctx.Context, pi *model.Par | |
var idx int | ||
switch t.meta.Partition.Type { | ||
case model.PartitionTypeRange: | ||
idx, err = t.locateRangePartition(ctx, pi, r) | ||
if len(pi.Columns) == 0 { | ||
idx, err = t.locateRangePartition(ctx, pi, r) | ||
} else { | ||
idx, err = t.locateRangeColumnPartition(ctx, pi, r) | ||
} | ||
case model.PartitionTypeHash: | ||
idx, err = t.locateHashPartition(ctx, pi, r) | ||
} | ||
|
@@ -328,13 +368,15 @@ func (t *partitionedTable) locatePartition(ctx sessionctx.Context, pi *model.Par | |
return pi.Definitions[idx].ID, nil | ||
} | ||
|
||
func (t *partitionedTable) locateRangePartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum) (int, error) { | ||
func (t *partitionedTable) locateRangeColumnPartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum) (int, error) { | ||
var err error | ||
var isNull bool | ||
partitionExprs := t.partitionExpr.UpperBounds | ||
evalBuffer := t.evalBufferPool.Get().(*chunk.MutRow) | ||
defer t.evalBufferPool.Put(evalBuffer) | ||
idx := sort.Search(len(partitionExprs), func(i int) bool { | ||
var ret int64 | ||
ret, isNull, err = partitionExprs[i].EvalInt(ctx, chunk.MutRowFromDatums(r).ToRow()) | ||
evalBuffer.SetDatums(r...) | ||
ret, isNull, err := partitionExprs[i].EvalInt(ctx, evalBuffer.ToRow()) | ||
if err != nil { | ||
return true // Break the search. | ||
} | ||
|
@@ -371,9 +413,74 @@ func (t *partitionedTable) locateRangePartition(ctx sessionctx.Context, pi *mode | |
return idx, nil | ||
} | ||
|
||
func (t *partitionedTable) locateRangePartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum) (int, error) { | ||
var ( | ||
ret int64 | ||
val int64 | ||
isNull bool | ||
err error | ||
) | ||
if col, ok := t.partitionExpr.Expr.(*expression.Column); ok { | ||
if r[col.Index].IsNull() { | ||
isNull = true | ||
} | ||
ret = r[col.Index].GetInt64() | ||
} else { | ||
evalBuffer := t.evalBufferPool.Get().(*chunk.MutRow) | ||
defer t.evalBufferPool.Put(evalBuffer) | ||
evalBuffer.SetDatums(r...) | ||
val, isNull, err = t.partitionExpr.Expr.EvalInt(ctx, evalBuffer.ToRow()) | ||
if err != nil { | ||
return 0, err | ||
} | ||
ret = val | ||
} | ||
unsigned := mysql.HasUnsignedFlag(t.partitionExpr.Expr.GetType().Flag) | ||
ranges := t.partitionExpr.ForRangePruning | ||
length := len(ranges.LessThan) | ||
pos := sort.Search(length, func(i int) bool { | ||
if isNull { | ||
return true | ||
} | ||
return ranges.compare(i, ret, unsigned) > 0 | ||
}) | ||
if isNull { | ||
pos = 0 | ||
} | ||
if pos < 0 || pos >= length { | ||
// The data does not belong to any of the partition returns `table has no partition for value %s`. | ||
var valueMsg string | ||
if pi.Expr != "" { | ||
e, err := expression.ParseSimpleExprWithTableInfo(ctx, pi.Expr, t.meta) | ||
if err == nil { | ||
val, _, err := e.EvalInt(ctx, chunk.MutRowFromDatums(r).ToRow()) | ||
if err == nil { | ||
valueMsg = fmt.Sprintf("%d", val) | ||
} | ||
} | ||
} else { | ||
// When the table is partitioned by range columns. | ||
valueMsg = "from column_list" | ||
} | ||
return 0, table.ErrNoPartitionForGivenValue.GenWithStackByArgs(valueMsg) | ||
} | ||
return pos, nil | ||
} | ||
|
||
// TODO: supports linear hashing | ||
func (t *partitionedTable) locateHashPartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum) (int, error) { | ||
ret, isNull, err := t.partitionExpr.Expr.EvalInt(ctx, chunk.MutRowFromDatums(r).ToRow()) | ||
if col, ok := t.partitionExpr.Expr.(*expression.Column); ok { | ||
ret := r[col.Index].GetInt64() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. r[col.Index] is null?
lysu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
ret = ret % int64(t.meta.Partition.Num) | ||
if ret < 0 { | ||
ret = -ret | ||
} | ||
return int(ret), nil | ||
} | ||
evalBuffer := t.evalBufferPool.Get().(*chunk.MutRow) | ||
defer t.evalBufferPool.Put(evalBuffer) | ||
evalBuffer.SetDatums(r...) | ||
ret, isNull, err := t.partitionExpr.Expr.EvalInt(ctx, evalBuffer.ToRow()) | ||
if err != nil { | ||
return 0, err | ||
} | ||
|
@@ -542,3 +649,31 @@ func rewritePartitionExpr(ctx sessionctx.Context, field ast.ExprNode, schema *ex | |
expr, err := expression.RewriteSimpleExprWithNames(ctx, field, schema, names) | ||
return expr, err | ||
} | ||
|
||
func compareUnsigned(v1, v2 int64) int { | ||
switch { | ||
case uint64(v1) > uint64(v2): | ||
return 1 | ||
case uint64(v1) == uint64(v2): | ||
return 0 | ||
} | ||
return -1 | ||
} | ||
|
||
func (lt *ForRangePruning) compare(ith int, v int64, unsigned bool) int { | ||
if ith == len(lt.LessThan)-1 { | ||
if lt.MaxValue { | ||
return 1 | ||
} | ||
} | ||
if unsigned { | ||
return compareUnsigned(lt.LessThan[ith], v) | ||
} | ||
switch { | ||
case lt.LessThan[ith] > v: | ||
return 1 | ||
case lt.LessThan[ith] == v: | ||
return 0 | ||
} | ||
return -1 | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would the column be null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I add some test to cover these cases, PTAL @tiancaiamao