Skip to content

Commit

Permalink
executor: fix retChk = nil in index_merge_reader (#46111) (#46277)
Browse files Browse the repository at this point in the history
close #46005
  • Loading branch information
ti-chi-bot authored May 31, 2024
1 parent cd1e029 commit 769a2f8
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 19 deletions.
40 changes: 21 additions & 19 deletions executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,11 @@ func (w *partialTableWorker) getDatumRow(table table.Table, row chunk.Row, handl
func (w *partialTableWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, handleCols plannercore.HandleCols) (
handles []kv.Handle, retChk *chunk.Chunk, err error) {
handles = make([]kv.Handle, 0, w.batchSize)
if len(w.byItems) != 0 {
retChk = chunk.NewChunkWithCapacity(w.getRetTpsForTableScan(), w.batchSize)
}
var memUsage int64
var chunkRowOffset int
defer w.memTracker.Consume(-memUsage)
tbl := w.tableReader.(*TableReaderExecutor).table
for len(handles) < w.batchSize {
Expand All @@ -692,7 +696,7 @@ func (w *partialTableWorker) extractTaskHandles(ctx context.Context, chk *chunk.
start := time.Now()
err = errors.Trace(w.tableReader.Next(ctx, chk))
if err != nil {
return handles, nil, err
return nil, nil, err
}
if be := w.tableReader.base(); be != nil && be.runtimeStats != nil {
be.runtimeStats.Record(time.Since(start), chk.NumRows())
Expand All @@ -706,15 +710,15 @@ func (w *partialTableWorker) extractTaskHandles(ctx context.Context, chk *chunk.
memDelta := chk.MemoryUsage()
memUsage += memDelta
w.memTracker.Consume(memDelta)
for i := 0; i < chk.NumRows(); i++ {
for chunkRowOffset = 0; chunkRowOffset < chk.NumRows(); chunkRowOffset++ {
if w.pushedLimit != nil {
w.scannedKeys++
if w.scannedKeys > (w.pushedLimit.Offset + w.pushedLimit.Count) {
// Skip the handles after Offset+Count.
return handles, retChk, nil
break
}
}
r := chk.GetRow(i)
r := chk.GetRow(chunkRowOffset)
handle, err := handleCols.BuildHandleFromIndexRow(r)
if err != nil {
return nil, nil, err
Expand All @@ -729,12 +733,9 @@ func (w *partialTableWorker) extractTaskHandles(ctx context.Context, chk *chunk.
}
handles = append(handles, handle)
}
// used for limit embedded.
// used for order by
if len(w.byItems) != 0 {
if retChk == nil {
retChk = chunk.NewChunkWithCapacity(w.getRetTpsForTableScan(), w.batchSize)
}
retChk.Append(chk, 0, chk.NumRows())
retChk.Append(chk, 0, chunkRowOffset)
}
}
w.batchSize *= 2
Expand Down Expand Up @@ -1545,7 +1546,11 @@ func (w *partialIndexWorker) getRetTpsForIndexScan(handleCols plannercore.Handle
func (w *partialIndexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, idxResult distsql.SelectResult, handleCols plannercore.HandleCols) (
handles []kv.Handle, retChk *chunk.Chunk, err error) {
handles = make([]kv.Handle, 0, w.batchSize)
if len(w.byItems) != 0 {
retChk = chunk.NewChunkWithCapacity(w.getRetTpsForIndexScan(handleCols, w.hasExtralPidCol()), w.batchSize)
}
var memUsage int64
var chunkRowOffset int
defer w.memTracker.Consume(-memUsage)
for len(handles) < w.batchSize {
requiredRows := w.batchSize - len(handles)
Expand All @@ -1559,7 +1564,7 @@ func (w *partialIndexWorker) extractTaskHandles(ctx context.Context, chk *chunk.
start := time.Now()
err = errors.Trace(idxResult.Next(ctx, chk))
if err != nil {
return handles, nil, err
return nil, nil, err
}
if w.stats != nil && w.idxID != 0 {
w.sc.GetSessionVars().StmtCtx.RuntimeStatsColl.GetBasicRuntimeStats(w.idxID).Record(time.Since(start), chk.NumRows())
Expand All @@ -1573,31 +1578,28 @@ func (w *partialIndexWorker) extractTaskHandles(ctx context.Context, chk *chunk.
memDelta := chk.MemoryUsage()
memUsage += memDelta
w.memTracker.Consume(memDelta)
for i := 0; i < chk.NumRows(); i++ {
for chunkRowOffset = 0; chunkRowOffset < chk.NumRows(); chunkRowOffset++ {
if w.pushedLimit != nil {
w.scannedKeys++
if w.scannedKeys > (w.pushedLimit.Offset + w.pushedLimit.Count) {
// Skip the handles after Offset+Count.
return handles, retChk, nil
break
}
}
var handle kv.Handle
if w.hasExtralPidCol() {
handle, err = handleCols.BuildPartitionHandleFromIndexRow(chk.GetRow(i))
handle, err = handleCols.BuildPartitionHandleFromIndexRow(chk.GetRow(chunkRowOffset))
} else {
handle, err = handleCols.BuildHandleFromIndexRow(chk.GetRow(i))
handle, err = handleCols.BuildHandleFromIndexRow(chk.GetRow(chunkRowOffset))
}
if err != nil {
return nil, nil, err
}
handles = append(handles, handle)
}
// used for limit embedded.
// used for order by
if len(w.byItems) != 0 {
if retChk == nil {
retChk = chunk.NewChunkWithCapacity(w.getRetTpsForIndexScan(handleCols, w.hasExtralPidCol()), w.batchSize)
}
retChk.Append(chk, 0, chk.NumRows())
retChk.Append(chk, 0, chunkRowOffset)
}
}
w.batchSize *= 2
Expand Down
13 changes: 13 additions & 0 deletions executor/index_merge_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1138,3 +1138,16 @@ func TestProcessInfoRaceWithIndexScan(t *testing.T) {
}
wg.Wait()
}

func TestIssues46005(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set tidb_index_lookup_size = 1024")
tk.MustExec("create table t(a int, b int, c int, index idx1(a, c), index idx2(b, c))")
for i := 0; i < 1500; i++ {
tk.MustExec(fmt.Sprintf("insert into t(a,b,c) values (1, 1, %d)", i))
}

tk.MustQuery("select /*+ USE_INDEX_MERGE(t, idx1, idx2) */ * from t where a = 1 or b = 1 order by c limit 1025")
}

0 comments on commit 769a2f8

Please sign in to comment.