Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: make chunk.SwapColumn private (#57274) #57369

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -798,12 +798,15 @@ func (b *executorBuilder) buildLimit(v *plannercore.PhysicalLimit) exec.Executor
end: v.Offset + v.Count,
}

childUsedSchemaLen := v.Children()[0].Schema().Len()
childSchemaLen := v.Children()[0].Schema().Len()
childUsedSchema := markChildrenUsedCols(v.Schema().Columns, v.Children()[0].Schema())[0]
e.columnIdxsUsedByChild = make([]int, 0, len(childUsedSchema))
e.columnIdxsUsedByChild = append(e.columnIdxsUsedByChild, childUsedSchema...)
if len(e.columnIdxsUsedByChild) == childUsedSchemaLen {
if len(e.columnIdxsUsedByChild) == childSchemaLen {
e.columnIdxsUsedByChild = nil // indicates that all columns are used. LimitExec will improve performance for this condition.
} else {
// construct a project evaluator to do the inline projection
e.columnSwapHelper = chunk.NewColumnSwapHelper(e.columnIdxsUsedByChild)
}
return e
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1321,6 +1321,7 @@ type LimitExec struct {

// columnIdxsUsedByChild keep column indexes of child executor used for inline projection
columnIdxsUsedByChild []int
columnSwapHelper *chunk.ColumnSwapHelper

// Log the close time when opentracing is enabled.
span opentracing.Span
Expand Down Expand Up @@ -1382,10 +1383,9 @@ func (e *LimitExec) Next(ctx context.Context, req *chunk.Chunk) error {
e.cursor += batchSize

if e.columnIdxsUsedByChild != nil {
for i, childIdx := range e.columnIdxsUsedByChild {
if err = req.SwapColumn(i, e.childResult, childIdx); err != nil {
return err
}
err = e.columnSwapHelper.SwapColumns(e.childResult, req)
if err != nil {
return err
}
} else {
req.SwapColumns(e.childResult)
Expand Down
1 change: 0 additions & 1 deletion pkg/expression/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ go_library(
"//pkg/util/encrypt",
"//pkg/util/generatedexpr",
"//pkg/util/hack",
"//pkg/util/intest",
"//pkg/util/intset",
"//pkg/util/logutil",
"//pkg/util/mathutil",
Expand Down
133 changes: 9 additions & 124 deletions pkg/expression/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,127 +15,10 @@
package expression

import (
"sync/atomic"

"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/disjointset"
"github.com/pingcap/tidb/pkg/util/intest"
)

type columnEvaluator struct {
inputIdxToOutputIdxes map[int][]int
// mergedInputIdxToOutputIdxes is only determined in runtime when saw the input chunk.
mergedInputIdxToOutputIdxes atomic.Pointer[map[int][]int]
}

// run evaluates "Column" expressions.
// NOTE: It should be called after all the other expressions are evaluated
//
// since it will change the content of the input Chunk.
func (e *columnEvaluator) run(ctx sessionctx.Context, input, output *chunk.Chunk) error {
// mergedInputIdxToOutputIdxes only can be determined in runtime when we saw the input chunk structure.
if e.mergedInputIdxToOutputIdxes.Load() == nil {
e.mergeInputIdxToOutputIdxes(input, e.inputIdxToOutputIdxes)
}
for inputIdx, outputIdxes := range *e.mergedInputIdxToOutputIdxes.Load() {
if err := output.SwapColumn(outputIdxes[0], input, inputIdx); err != nil {
return err
}
for i, length := 1, len(outputIdxes); i < length; i++ {
output.MakeRef(outputIdxes[0], outputIdxes[i])
}
}
return nil
}

// mergeInputIdxToOutputIdxes merges separate inputIdxToOutputIdxes entries when column references
// are detected within the input chunk. This process ensures consistent handling of columns derived
// from the same original source.
//
// Consider the following scenario:
//
// Initial scan operation produces a column 'a':
//
// scan: a (addr: ???)
//
// This column 'a' is used in the first projection (proj1) to create two columns a1 and a2, both referencing 'a':
//
// proj1
// / \
// / \
// / \
// a1 (addr: 0xe) a2 (addr: 0xe)
// / \
// / \
// / \
// proj2 proj2
// / \ / \
// / \ / \
// a3 a4 a5 a6
//
// (addr: 0xe) (addr: 0xe) (addr: 0xe) (addr: 0xe)
//
// Here, a1 and a2 share the same address (0xe), indicating they reference the same data from the original 'a'.
//
// When moving to the second projection (proj2), the system tries to project these columns further:
// - The first set (left side) consists of a3 and a4, derived from a1, both retaining the address (0xe).
// - The second set (right side) consists of a5 and a6, derived from a2, also starting with address (0xe).
//
// When proj1 is complete, the output chunk contains two columns [a1, a2], both derived from the single column 'a' from the scan.
// Since both a1 and a2 are column references with the same address (0xe), they are treated as referencing the same data.
//
// In proj2, two separate <inputIdx, []outputIdxes> items are created:
// - <0, [0,1]>: This means the 0th input column (a1) is projected twice, into the 0th and 1st columns of the output chunk.
// - <1, [2,3]>: This means the 1st input column (a2) is projected twice, into the 2nd and 3rd columns of the output chunk.
//
// Due to the column swapping logic in each projection, after applying the <0, [0,1]> projection,
// the addresses for a1 and a2 may become swapped or invalid:
//
// proj1: a1 (addr: invalid) a2 (addr: invalid)
//
// This can lead to issues in proj2, where further operations on these columns may be unsafe:
//
// proj2: a3 (addr: 0xe) a4 (addr: 0xe) a5 (addr: ???) a6 (addr: ???)
//
// Therefore, it's crucial to identify and merge the original column references early, ensuring
// the final inputIdxToOutputIdxes mapping accurately reflects the shared origins of the data.
// For instance, <0, [0,1,2,3]> indicates that the 0th input column (original 'a') is referenced
// by all four output columns in the final output.
//
// mergeInputIdxToOutputIdxes merges inputIdxToOutputIdxes based on detected column references.
// This ensures that columns with the same reference are correctly handled in the output chunk.
func (e *columnEvaluator) mergeInputIdxToOutputIdxes(input *chunk.Chunk, inputIdxToOutputIdxes map[int][]int) {
originalDJSet := disjointset.NewSet[int](4)
flag := make([]bool, input.NumCols())
// Detect self column-references inside the input chunk by comparing column addresses
for i := 0; i < input.NumCols(); i++ {
if flag[i] {
continue
}
for j := i + 1; j < input.NumCols(); j++ {
if input.Column(i) == input.Column(j) {
flag[j] = true
originalDJSet.Union(i, j)
}
}
}
// Merge inputIdxToOutputIdxes based on the detected column references.
newInputIdxToOutputIdxes := make(map[int][]int, len(inputIdxToOutputIdxes))
for inputIdx := range inputIdxToOutputIdxes {
// Root idx is internal offset, not the right column index.
originalRootIdx := originalDJSet.FindRoot(inputIdx)
originalVal, ok := originalDJSet.FindVal(originalRootIdx)
intest.Assert(ok)
mergedOutputIdxes := newInputIdxToOutputIdxes[originalVal]
mergedOutputIdxes = append(mergedOutputIdxes, inputIdxToOutputIdxes[inputIdx]...)
newInputIdxToOutputIdxes[originalVal] = mergedOutputIdxes
}
// Update the merged inputIdxToOutputIdxes automatically.
// Once failed, it means other worker has done this job at meantime.
e.mergedInputIdxToOutputIdxes.CompareAndSwap(nil, &newInputIdxToOutputIdxes)
}

type defaultEvaluator struct {
outputIdxes []int
exprs []Expression
Expand Down Expand Up @@ -176,8 +59,8 @@ func (e *defaultEvaluator) run(ctx sessionctx.Context, input, output *chunk.Chun
// It separates them to "column" and "other" expressions and evaluates "other"
// expressions before "column" expressions.
type EvaluatorSuite struct {
*columnEvaluator // Evaluator for column expressions.
*defaultEvaluator // Evaluator for other expressions.
ColumnSwapHelper *chunk.ColumnSwapHelper // Evaluator for column expressions.
*defaultEvaluator // Evaluator for other expressions.
}

// NewEvaluatorSuite creates an EvaluatorSuite to evaluate all the exprs.
Expand All @@ -187,11 +70,11 @@ func NewEvaluatorSuite(exprs []Expression, avoidColumnEvaluator bool) *Evaluator

for i := 0; i < len(exprs); i++ {
if col, isCol := exprs[i].(*Column); isCol && !avoidColumnEvaluator {
if e.columnEvaluator == nil {
e.columnEvaluator = &columnEvaluator{inputIdxToOutputIdxes: make(map[int][]int)}
if e.ColumnSwapHelper == nil {
e.ColumnSwapHelper = &chunk.ColumnSwapHelper{InputIdxToOutputIdxes: make(map[int][]int)}
}
inputIdx, outputIdx := col.Index, i
e.columnEvaluator.inputIdxToOutputIdxes[inputIdx] = append(e.columnEvaluator.inputIdxToOutputIdxes[inputIdx], outputIdx)
e.ColumnSwapHelper.InputIdxToOutputIdxes[inputIdx] = append(e.ColumnSwapHelper.InputIdxToOutputIdxes[inputIdx], outputIdx)
continue
}
if e.defaultEvaluator == nil {
Expand Down Expand Up @@ -225,8 +108,10 @@ func (e *EvaluatorSuite) Run(ctx sessionctx.Context, input, output *chunk.Chunk)
}
}

if e.columnEvaluator != nil {
return e.columnEvaluator.run(ctx, input, output)
// NOTE: It should be called after all the other expressions are evaluated
// since it will change the content of the input Chunk.
if e.ColumnSwapHelper != nil {
return e.ColumnSwapHelper.SwapColumns(input, output)
}
return nil
}
40 changes: 0 additions & 40 deletions pkg/expression/evaluator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package expression

import (
"slices"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -594,42 +593,3 @@ func TestMod(t *testing.T) {
require.NoError(t, err)
require.Equal(t, types.NewDatum(1.5), r)
}

func TestMergeInputIdxToOutputIdxes(t *testing.T) {
ctx := createContext(t)
inputIdxToOutputIdxes := make(map[int][]int)
// input 0th should be column referred as 0th and 1st in output columns.
inputIdxToOutputIdxes[0] = []int{0, 1}
// input 1th should be column referred as 2nd and 3rd in output columns.
inputIdxToOutputIdxes[1] = []int{2, 3}
columnEval := columnEvaluator{inputIdxToOutputIdxes: inputIdxToOutputIdxes}

input := chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeLonglong), types.NewFieldType(mysql.TypeLonglong)}, 2)
input.AppendInt64(0, 99)
// input chunk's 0th and 1st are column referred itself.
input.MakeRef(0, 1)

// chunk: col1 <---(ref) col2
// ____________/ \___________/ \___
// proj: col1 col2 col3 col4
//
// original case after inputIdxToOutputIdxes[0], the original col2 will be nil pointer
// cause consecutive col3,col4 ref projection are invalid.
//
// after fix, the new inputIdxToOutputIdxes should be: inputIdxToOutputIdxes[0]: {0, 1, 2, 3}

output := chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeLonglong), types.NewFieldType(mysql.TypeLonglong),
types.NewFieldType(mysql.TypeLonglong), types.NewFieldType(mysql.TypeLonglong)}, 2)

err := columnEval.run(ctx, input, output)
require.NoError(t, err)
// all four columns are column-referred, pointing to the first one.
require.Equal(t, output.Column(0), output.Column(1))
require.Equal(t, output.Column(1), output.Column(2))
require.Equal(t, output.Column(2), output.Column(3))
require.Equal(t, output.GetRow(0).GetInt64(0), int64(99))

require.Equal(t, len(*columnEval.mergedInputIdxToOutputIdxes.Load()), 1)
slices.Sort((*columnEval.mergedInputIdxToOutputIdxes.Load())[0])
require.Equal(t, (*columnEval.mergedInputIdxToOutputIdxes.Load())[0], []int{0, 1, 2, 3})
}
2 changes: 1 addition & 1 deletion pkg/expression/integration_test/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ go_test(
"main_test.go",
],
flaky = True,
shard_count = 29,
shard_count = 30,
deps = [
"//pkg/config",
"//pkg/domain",
Expand Down
49 changes: 49 additions & 0 deletions pkg/expression/integration_test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3295,3 +3295,52 @@ func TestIssue43527(t *testing.T) {
"SELECT @total := @total + d FROM (SELECT d FROM test) AS temp, (SELECT @total := b FROM test) AS T1 where @total >= 100",
).Check(testkit.Rows("200", "300", "400", "500"))
}

func TestIssue55885(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t_jg8o (c_s int not null unique ,c__qy double ,c_z int not null ,c_a90ol text not null);")
tk.MustExec("insert into t_jg8o (c_s, c__qy, c_z, c_a90ol) values" +
"(-975033779, 85.65, -355481284, 'gnip' ),(-2018599732, 85.86, 1617093413, 'm' )," +
"(-960107027, 4.6, -2042358076, 'y1q')," +
"(-3, 38.1, -1528586343, 'ex_2')," +
"(69386953, 32768.0, -62220810, 'tfkxjj5c')," +
"(587181689, -9223372036854775806.3, -1666156943, 'queemvgj')," +
"(-218561796, 85.2, -670390288, 'nf990nol')," +
"(858419954, 2147483646.0, -1649362344, 'won_9')," +
"(-1120115215, 22.100, 1509989939, 'w')," +
"(-1388119356, 94.32, -1694148464, 'gu4i4knyhm')," +
"(-1016230734, -4294967295.8, 1430313391, 's')," +
"(-1861825796, 36.52, -1457928755, 'j')," +
"(1963621165, 88.87, 18928603, 'gxbsloff' )," +
"(1492879828, cast(null as double), 759883041, 'zwue')," +
"(-1607192175, 12.36, 1669523024, 'qt5zch71a')," +
"(1534068569, 46.79, -392085130, 'bc')," +
"(155707446, 9223372036854775809.4, 1727199557, 'qyghenu9t6')," +
"(-1524976778, 75.99, 335492222, 'sdgde0z')," +
"(175403335, cast(null as double), -69711503, 'ja')," +
"(-272715456, 48.62, 753928713, 'ur')," +
"(-2035825967, 257.3, -1598426762, 'lmqmn')," +
"(-1178957955, 2147483648.100000, 1432554380, 'dqpb210')," +
"(-2056628646, 254.5, -1476177588, 'k41ajpt7x')," +
"(-914210874, 126.7, -421919910, 'x57ud7oy1')," +
"(-88586773, 1.2, 1568247510, 'drmxi8')," +
"(-834563269, -4294967296.7, 1163133933, 'wp')," +
"(-84490060, 54.13, -630289437, '_3_twecg5h')," +
" (267700893, 54.75, 370343042, 'n72')," +
"(552106333, 32766.2, 2365745, 's7tt')," +
"(643440707, 65536.8, -850412592, 'wmluxa9a')," +
"(1709853766, -4294967296.5, -21041749, 'obqj0uu5v')," +
"(-7, 80.88, 528792379, 'n5qr9m26i')," +
"(-456431629, 28.43, 1958788149, 'b')," +
"(-28841240, 11.86, -1089765168, 'pqg')," +
"(-807839288, 25.89, 504535500, 'cs3tkhs')," +
"(-52910064, 85.16, 354032882, '_ffjo67yxe')," +
"(1919869830, 81.81, -272247558, 'aj')," +
"(165434725, -2147483648.0, 11, 'xxnsf5')," +
"(3, -2147483648.7, 1616632952, 'g7t8tqyi')," +
"(1851859144, 70.73, -1105664209, 'qjfhjr');")

tk.MustQuery("SELECT subq_0.c3 as c1 FROM (select c_a90ol as c3, c_a90ol as c4, var_pop(cast(c__qy as double)) over (partition by c_a90ol, c_s order by c_z) as c5 from t_jg8o limit 65) as subq_0 LIMIT 37")
}
2 changes: 2 additions & 0 deletions pkg/util/chunk/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ go_library(
"//pkg/parser/terror",
"//pkg/types",
"//pkg/util/checksum",
"//pkg/util/disjointset",
"//pkg/util/disk",
"//pkg/util/encrypt",
"//pkg/util/hack",
"//pkg/util/intest",
"//pkg/util/logutil",
"//pkg/util/mathutil",
"//pkg/util/memory",
Expand Down
6 changes: 4 additions & 2 deletions pkg/util/chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,12 @@ func (c *Chunk) MakeRefTo(dstColIdx int, src *Chunk, srcColIdx int) error {
return nil
}

// SwapColumn swaps Column "c.columns[colIdx]" with Column
// swapColumn swaps Column "c.columns[colIdx]" with Column
// "other.columns[otherIdx]". If there exists columns refer to the Column to be
// swapped, we need to re-build the reference.
func (c *Chunk) SwapColumn(colIdx int, other *Chunk, otherIdx int) error {
// this function should not be used directly, if you wants to swap columns between two chunks,
// use ColumnSwapHelper.SwapColumns instead.
func (c *Chunk) swapColumn(colIdx int, other *Chunk, otherIdx int) error {
if c.sel != nil || other.sel != nil {
return errors.New(msgErrSelNotNil)
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/util/chunk/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,25 +642,25 @@ func TestSwapColumn(t *testing.T) {
checkRef()

// swap two chunk's columns
require.NoError(t, chk1.SwapColumn(0, chk2, 0))
require.NoError(t, chk1.swapColumn(0, chk2, 0))
checkRef()

require.NoError(t, chk1.SwapColumn(0, chk2, 0))
require.NoError(t, chk1.swapColumn(0, chk2, 0))
checkRef()

// swap reference and referenced columns
require.NoError(t, chk2.SwapColumn(1, chk2, 0))
require.NoError(t, chk2.swapColumn(1, chk2, 0))
checkRef()

// swap the same column in the same chunk
require.NoError(t, chk2.SwapColumn(1, chk2, 1))
require.NoError(t, chk2.swapColumn(1, chk2, 1))
checkRef()

// swap reference and another column
require.NoError(t, chk2.SwapColumn(1, chk2, 2))
require.NoError(t, chk2.swapColumn(1, chk2, 2))
checkRef()

require.NoError(t, chk2.SwapColumn(2, chk2, 0))
require.NoError(t, chk2.swapColumn(2, chk2, 0))
checkRef()
}

Expand Down
Loading