15
15
package expression
16
16
17
17
import (
18
- "sync/atomic"
19
-
20
18
"github.com/pingcap/tidb/pkg/sessionctx"
21
19
"github.com/pingcap/tidb/pkg/util/chunk"
22
- "github.com/pingcap/tidb/pkg/util/disjointset"
23
- "github.com/pingcap/tidb/pkg/util/intest"
24
20
)
25
21
26
- type columnEvaluator struct {
27
- inputIdxToOutputIdxes map [int ][]int
28
- // mergedInputIdxToOutputIdxes is only determined in runtime when saw the input chunk.
29
- mergedInputIdxToOutputIdxes atomic.Pointer [map [int ][]int ]
30
- }
31
-
32
- // run evaluates "Column" expressions.
33
- // NOTE: It should be called after all the other expressions are evaluated
34
- //
35
- // since it will change the content of the input Chunk.
36
- func (e * columnEvaluator ) run (ctx sessionctx.Context , input , output * chunk.Chunk ) error {
37
- // mergedInputIdxToOutputIdxes only can be determined in runtime when we saw the input chunk structure.
38
- if e .mergedInputIdxToOutputIdxes .Load () == nil {
39
- e .mergeInputIdxToOutputIdxes (input , e .inputIdxToOutputIdxes )
40
- }
41
- for inputIdx , outputIdxes := range * e .mergedInputIdxToOutputIdxes .Load () {
42
- if err := output .SwapColumn (outputIdxes [0 ], input , inputIdx ); err != nil {
43
- return err
44
- }
45
- for i , length := 1 , len (outputIdxes ); i < length ; i ++ {
46
- output .MakeRef (outputIdxes [0 ], outputIdxes [i ])
47
- }
48
- }
49
- return nil
50
- }
51
-
52
- // mergeInputIdxToOutputIdxes merges separate inputIdxToOutputIdxes entries when column references
53
- // are detected within the input chunk. This process ensures consistent handling of columns derived
54
- // from the same original source.
55
- //
56
- // Consider the following scenario:
57
- //
58
- // Initial scan operation produces a column 'a':
59
- //
60
- // scan: a (addr: ???)
61
- //
62
- // This column 'a' is used in the first projection (proj1) to create two columns a1 and a2, both referencing 'a':
63
- //
64
- // proj1
65
- // / \
66
- // / \
67
- // / \
68
- // a1 (addr: 0xe) a2 (addr: 0xe)
69
- // / \
70
- // / \
71
- // / \
72
- // proj2 proj2
73
- // / \ / \
74
- // / \ / \
75
- // a3 a4 a5 a6
76
- //
77
- // (addr: 0xe) (addr: 0xe) (addr: 0xe) (addr: 0xe)
78
- //
79
- // Here, a1 and a2 share the same address (0xe), indicating they reference the same data from the original 'a'.
80
- //
81
- // When moving to the second projection (proj2), the system tries to project these columns further:
82
- // - The first set (left side) consists of a3 and a4, derived from a1, both retaining the address (0xe).
83
- // - The second set (right side) consists of a5 and a6, derived from a2, also starting with address (0xe).
84
- //
85
- // When proj1 is complete, the output chunk contains two columns [a1, a2], both derived from the single column 'a' from the scan.
86
- // Since both a1 and a2 are column references with the same address (0xe), they are treated as referencing the same data.
87
- //
88
- // In proj2, two separate <inputIdx, []outputIdxes> items are created:
89
- // - <0, [0,1]>: This means the 0th input column (a1) is projected twice, into the 0th and 1st columns of the output chunk.
90
- // - <1, [2,3]>: This means the 1st input column (a2) is projected twice, into the 2nd and 3rd columns of the output chunk.
91
- //
92
- // Due to the column swapping logic in each projection, after applying the <0, [0,1]> projection,
93
- // the addresses for a1 and a2 may become swapped or invalid:
94
- //
95
- // proj1: a1 (addr: invalid) a2 (addr: invalid)
96
- //
97
- // This can lead to issues in proj2, where further operations on these columns may be unsafe:
98
- //
99
- // proj2: a3 (addr: 0xe) a4 (addr: 0xe) a5 (addr: ???) a6 (addr: ???)
100
- //
101
- // Therefore, it's crucial to identify and merge the original column references early, ensuring
102
- // the final inputIdxToOutputIdxes mapping accurately reflects the shared origins of the data.
103
- // For instance, <0, [0,1,2,3]> indicates that the 0th input column (original 'a') is referenced
104
- // by all four output columns in the final output.
105
- //
106
- // mergeInputIdxToOutputIdxes merges inputIdxToOutputIdxes based on detected column references.
107
- // This ensures that columns with the same reference are correctly handled in the output chunk.
108
- func (e * columnEvaluator ) mergeInputIdxToOutputIdxes (input * chunk.Chunk , inputIdxToOutputIdxes map [int ][]int ) {
109
- originalDJSet := disjointset.NewSet [int ](4 )
110
- flag := make ([]bool , input .NumCols ())
111
- // Detect self column-references inside the input chunk by comparing column addresses
112
- for i := 0 ; i < input .NumCols (); i ++ {
113
- if flag [i ] {
114
- continue
115
- }
116
- for j := i + 1 ; j < input .NumCols (); j ++ {
117
- if input .Column (i ) == input .Column (j ) {
118
- flag [j ] = true
119
- originalDJSet .Union (i , j )
120
- }
121
- }
122
- }
123
- // Merge inputIdxToOutputIdxes based on the detected column references.
124
- newInputIdxToOutputIdxes := make (map [int ][]int , len (inputIdxToOutputIdxes ))
125
- for inputIdx := range inputIdxToOutputIdxes {
126
- // Root idx is internal offset, not the right column index.
127
- originalRootIdx := originalDJSet .FindRoot (inputIdx )
128
- originalVal , ok := originalDJSet .FindVal (originalRootIdx )
129
- intest .Assert (ok )
130
- mergedOutputIdxes := newInputIdxToOutputIdxes [originalVal ]
131
- mergedOutputIdxes = append (mergedOutputIdxes , inputIdxToOutputIdxes [inputIdx ]... )
132
- newInputIdxToOutputIdxes [originalVal ] = mergedOutputIdxes
133
- }
134
- // Update the merged inputIdxToOutputIdxes automatically.
135
- // Once failed, it means other worker has done this job at meantime.
136
- e .mergedInputIdxToOutputIdxes .CompareAndSwap (nil , & newInputIdxToOutputIdxes )
137
- }
138
-
139
22
type defaultEvaluator struct {
140
23
outputIdxes []int
141
24
exprs []Expression
@@ -176,8 +59,8 @@ func (e *defaultEvaluator) run(ctx sessionctx.Context, input, output *chunk.Chun
176
59
// It separates them to "column" and "other" expressions and evaluates "other"
177
60
// expressions before "column" expressions.
178
61
type EvaluatorSuite struct {
179
- * columnEvaluator // Evaluator for column expressions.
180
- * defaultEvaluator // Evaluator for other expressions.
62
+ ColumnSwapHelper * chunk. ColumnSwapHelper // Evaluator for column expressions.
63
+ * defaultEvaluator // Evaluator for other expressions.
181
64
}
182
65
183
66
// NewEvaluatorSuite creates an EvaluatorSuite to evaluate all the exprs.
@@ -187,11 +70,11 @@ func NewEvaluatorSuite(exprs []Expression, avoidColumnEvaluator bool) *Evaluator
187
70
188
71
for i := 0 ; i < len (exprs ); i ++ {
189
72
if col , isCol := exprs [i ].(* Column ); isCol && ! avoidColumnEvaluator {
190
- if e .columnEvaluator == nil {
191
- e .columnEvaluator = & columnEvaluator { inputIdxToOutputIdxes : make (map [int ][]int )}
73
+ if e .ColumnSwapHelper == nil {
74
+ e .ColumnSwapHelper = & chunk. ColumnSwapHelper { InputIdxToOutputIdxes : make (map [int ][]int )}
192
75
}
193
76
inputIdx , outputIdx := col .Index , i
194
- e .columnEvaluator . inputIdxToOutputIdxes [inputIdx ] = append (e .columnEvaluator . inputIdxToOutputIdxes [inputIdx ], outputIdx )
77
+ e .ColumnSwapHelper . InputIdxToOutputIdxes [inputIdx ] = append (e .ColumnSwapHelper . InputIdxToOutputIdxes [inputIdx ], outputIdx )
195
78
continue
196
79
}
197
80
if e .defaultEvaluator == nil {
@@ -225,8 +108,10 @@ func (e *EvaluatorSuite) Run(ctx sessionctx.Context, input, output *chunk.Chunk)
225
108
}
226
109
}
227
110
228
- if e .columnEvaluator != nil {
229
- return e .columnEvaluator .run (ctx , input , output )
111
+ // NOTE: It should be called after all the other expressions are evaluated
112
+ // since it will change the content of the input Chunk.
113
+ if e .ColumnSwapHelper != nil {
114
+ return e .ColumnSwapHelper .SwapColumns (input , output )
230
115
}
231
116
return nil
232
117
}
0 commit comments