Skip to content

Commit

Permalink
executor: Fix crash during sort spill (#47581) (#47626)
Browse files Browse the repository at this point in the history
close #47538
  • Loading branch information
ti-chi-bot authored Oct 17, 2023
1 parent 34c0ef6 commit a099a2a
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 68 deletions.
5 changes: 4 additions & 1 deletion executor/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,10 @@ func (e *SortExec) fetchRowChunks(ctx context.Context) error {
}
}
if e.rowChunks.NumRow() > 0 {
e.rowChunks.Sort()
err := e.rowChunks.Sort()
if err != nil {
return err
}
e.partitionList = append(e.partitionList, e.rowChunks)
}
return nil
Expand Down
160 changes: 93 additions & 67 deletions util/chunk/row_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package chunk

import (
"errors"
"fmt"
"sort"
"sync"
"time"
Expand Down Expand Up @@ -73,6 +74,11 @@ func (m *mutexForRowContainer) RUnlock() {
m.rLock.RUnlock()
}

type spillHelper interface {
SpillToDisk()
hasEnoughDataToSpill(t *memory.Tracker) bool
}

// RowContainer provides a place for many rows, so many that we might want to spill them into disk.
// nolint:structcheck
type RowContainer struct {
Expand Down Expand Up @@ -118,6 +124,14 @@ func (c *RowContainer) ShallowCopyWithNewMutex() *RowContainer {

// SpillToDisk spills data to disk. This function may be called in parallel.
func (c *RowContainer) SpillToDisk() {
c.spillToDisk(nil)
}

func (*RowContainer) hasEnoughDataToSpill(_ *memory.Tracker) bool {
return true
}

func (c *RowContainer) spillToDisk(preSpillError error) {
c.m.Lock()
defer c.m.Unlock()
if c.alreadySpilled() {
Expand All @@ -137,6 +151,22 @@ func (c *RowContainer) SpillToDisk() {
N := c.m.records.inMemory.NumChunks()
c.m.records.inDisk = NewListInDisk(c.m.records.inMemory.FieldTypes())
c.m.records.inDisk.diskTracker.AttachTo(c.diskTracker)
defer func() {
if r := recover(); r != nil {
err := fmt.Errorf("%v", r)
c.m.records.spillError = err
logutil.BgLogger().Error("spill to disk failed", zap.Stack("stack"), zap.Error(err))
}
}()
failpoint.Inject("spillToDiskOutOfDiskQuota", func(val failpoint.Value) {
if val.(bool) {
panic("out of disk quota when spilling")
}
})
if preSpillError != nil {
c.m.records.spillError = preSpillError
return
}
for i := 0; i < N; i++ {
chk := c.m.records.inMemory.GetChunk(i)
err = c.m.records.inDisk.Add(chk)
Expand Down Expand Up @@ -290,8 +320,9 @@ func (c *RowContainer) Close() (err error) {
func (c *RowContainer) ActionSpill() *SpillDiskAction {
if c.actionSpill == nil {
c.actionSpill = &SpillDiskAction{
c: c,
cond: spillStatusCond{sync.NewCond(new(sync.Mutex)), notSpilled}}
c: c,
baseSpillDiskAction: &baseSpillDiskAction{cond: spillStatusCond{sync.NewCond(new(sync.Mutex)), notSpilled}},
}
}
return c.actionSpill
}
Expand All @@ -300,23 +331,21 @@ func (c *RowContainer) ActionSpill() *SpillDiskAction {
func (c *RowContainer) ActionSpillForTest() *SpillDiskAction {
c.actionSpill = &SpillDiskAction{
c: c,
testSyncInputFunc: func() {
c.actionSpill.testWg.Add(1)
baseSpillDiskAction: &baseSpillDiskAction{
testSyncInputFunc: func() {
c.actionSpill.testWg.Add(1)
},
testSyncOutputFunc: func() {
c.actionSpill.testWg.Done()
},
cond: spillStatusCond{sync.NewCond(new(sync.Mutex)), notSpilled},
},
testSyncOutputFunc: func() {
c.actionSpill.testWg.Done()
},
cond: spillStatusCond{sync.NewCond(new(sync.Mutex)), notSpilled},
}
return c.actionSpill
}

// SpillDiskAction implements memory.ActionOnExceed for chunk.List. If
// the memory quota of a query is exceeded, SpillDiskAction.Action is
// triggered.
type SpillDiskAction struct {
type baseSpillDiskAction struct {
memory.BaseOOMAction
c *RowContainer
m sync.Mutex
once sync.Once
cond spillStatusCond
Expand All @@ -327,6 +356,20 @@ type SpillDiskAction struct {
testWg sync.WaitGroup
}

// SpillDiskAction implements memory.ActionOnExceed for chunk.List. If
// the memory quota of a query is exceeded, SpillDiskAction.Action is
// triggered.
type SpillDiskAction struct {
c *RowContainer
*baseSpillDiskAction
}

// Action sends a signal to trigger spillToDisk method of RowContainer
// and if it is already triggered before, call its fallbackAction.
func (a *SpillDiskAction) Action(t *memory.Tracker) {
a.action(t, a.c)
}

type spillStatusCond struct {
*sync.Cond
// status indicates different stages for the Action
Expand All @@ -344,38 +387,35 @@ const (
spilledYet
)

func (a *SpillDiskAction) setStatus(status spillStatus) {
func (a *baseSpillDiskAction) setStatus(status spillStatus) {
a.cond.L.Lock()
defer a.cond.L.Unlock()
a.cond.status = status
}

func (a *SpillDiskAction) getStatus() spillStatus {
func (a *baseSpillDiskAction) getStatus() spillStatus {
a.cond.L.Lock()
defer a.cond.L.Unlock()
return a.cond.status
}

// Action sends a signal to trigger spillToDisk method of RowContainer
// and if it is already triggered before, call its fallbackAction.
func (a *SpillDiskAction) Action(t *memory.Tracker) {
func (a *baseSpillDiskAction) action(t *memory.Tracker, spillHelper spillHelper) {
a.m.Lock()
defer a.m.Unlock()

if a.getStatus() == notSpilled {
if a.getStatus() == notSpilled && spillHelper.hasEnoughDataToSpill(t) {
a.once.Do(func() {
logutil.BgLogger().Info("memory exceeds quota, spill to disk now.",
zap.Int64("consumed", t.BytesConsumed()), zap.Int64("quota", t.GetBytesLimit()))
if a.testSyncInputFunc != nil {
a.testSyncInputFunc()
c := a.c
go func() {
c.SpillToDisk()
spillHelper.SpillToDisk()
a.testSyncOutputFunc()
}()
return
}
go a.c.SpillToDisk()
go spillHelper.SpillToDisk()
})
return
}
Expand All @@ -395,7 +435,7 @@ func (a *SpillDiskAction) Action(t *memory.Tracker) {
}

// Reset resets the status for SpillDiskAction.
func (a *SpillDiskAction) Reset() {
func (a *baseSpillDiskAction) Reset() {
a.m.Lock()
defer a.m.Unlock()
a.setStatus(notSpilled)
Expand All @@ -406,12 +446,12 @@ func (a *SpillDiskAction) Reset() {
func (a *SpillDiskAction) SetLogHook(hook func(uint64)) {}

// GetPriority get the priority of the Action.
func (a *SpillDiskAction) GetPriority() int64 {
func (a *baseSpillDiskAction) GetPriority() int64 {
return memory.DefSpillPriority
}

// WaitForTest waits all goroutine have gone.
func (a *SpillDiskAction) WaitForTest() {
func (a *baseSpillDiskAction) WaitForTest() {
a.testWg.Wait()
}

Expand Down Expand Up @@ -478,9 +518,15 @@ func (c *SortedRowContainer) keyColumnsLess(i, j int) bool {
}

// Sort inits pointers and sorts the records.
func (c *SortedRowContainer) Sort() {
func (c *SortedRowContainer) Sort() (ret error) {
c.ptrM.Lock()
defer c.ptrM.Unlock()
ret = nil
defer func() {
if r := recover(); r != nil {
ret = fmt.Errorf("%v", r)
}
}()
if c.ptrM.rowPtrs != nil {
return
}
Expand All @@ -495,13 +541,25 @@ func (c *SortedRowContainer) Sort() {
c.ptrM.rowPtrs = append(c.ptrM.rowPtrs, RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)})
}
}
failpoint.Inject("errorDuringSortRowContainer", func(val failpoint.Value) {
if val.(bool) {
panic("sort meet error")
}
})
sort.Slice(c.ptrM.rowPtrs, c.keyColumnsLess)
c.GetMemTracker().Consume(int64(8 * c.numRow))
return
}

func (c *SortedRowContainer) sortAndSpillToDisk() {
c.Sort()
c.RowContainer.SpillToDisk()
// SpillToDisk spills data to disk. This function may be called in parallel.
func (c *SortedRowContainer) SpillToDisk() {
err := c.Sort()
c.RowContainer.spillToDisk(err)
}

func (c *SortedRowContainer) hasEnoughDataToSpill(t *memory.Tracker) bool {
// Guarantee that each partition size is at least 10% of the threshold, to avoid opening too many files.
return c.GetMemTracker().BytesConsumed() > t.GetBytesLimit()/10
}

// Add appends a chunk into the SortedRowContainer.
Expand All @@ -526,8 +584,8 @@ func (c *SortedRowContainer) GetSortedRow(idx int) (Row, error) {
func (c *SortedRowContainer) ActionSpill() *SortAndSpillDiskAction {
if c.actionSpill == nil {
c.actionSpill = &SortAndSpillDiskAction{
c: c,
SpillDiskAction: c.RowContainer.ActionSpill(),
c: c,
baseSpillDiskAction: c.RowContainer.ActionSpill().baseSpillDiskAction,
}
}
return c.actionSpill
Expand All @@ -536,8 +594,8 @@ func (c *SortedRowContainer) ActionSpill() *SortAndSpillDiskAction {
// ActionSpillForTest returns a SortAndSpillDiskAction for sorting and spilling over to disk for test.
func (c *SortedRowContainer) ActionSpillForTest() *SortAndSpillDiskAction {
c.actionSpill = &SortAndSpillDiskAction{
c: c,
SpillDiskAction: c.RowContainer.ActionSpillForTest(),
c: c,
baseSpillDiskAction: c.RowContainer.ActionSpillForTest().baseSpillDiskAction,
}
return c.actionSpill
}
Expand All @@ -547,45 +605,13 @@ func (c *SortedRowContainer) ActionSpillForTest() *SortAndSpillDiskAction {
// triggered.
type SortAndSpillDiskAction struct {
c *SortedRowContainer
*SpillDiskAction
*baseSpillDiskAction
}

// Action sends a signal to trigger sortAndSpillToDisk method of RowContainer
// and if it is already triggered before, call its fallbackAction.
func (a *SortAndSpillDiskAction) Action(t *memory.Tracker) {
a.m.Lock()
defer a.m.Unlock()
// Guarantee that each partition size is at least 10% of the threshold, to avoid opening too many files.
if a.getStatus() == notSpilled && a.c.GetMemTracker().BytesConsumed() > t.GetBytesLimit()/10 {
a.once.Do(func() {
logutil.BgLogger().Info("memory exceeds quota, spill to disk now.",
zap.Int64("consumed", t.BytesConsumed()), zap.Int64("quota", t.GetBytesLimit()))
if a.testSyncInputFunc != nil {
a.testSyncInputFunc()
c := a.c
go func() {
c.sortAndSpillToDisk()
a.testSyncOutputFunc()
}()
return
}
go a.c.sortAndSpillToDisk()
})
return
}

a.cond.L.Lock()
for a.cond.status == spilling {
a.cond.Wait()
}
a.cond.L.Unlock()

if !t.CheckExceed() {
return
}
if fallback := a.GetFallback(); fallback != nil {
fallback.Action(t)
}
a.action(t, a.c)
}

// SetLogHook sets the hook, it does nothing just to form the memory.ActionOnExceed interface.
Expand Down
68 changes: 68 additions & 0 deletions util/chunk/row_container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package chunk
import (
"testing"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/memory"
Expand Down Expand Up @@ -228,3 +229,70 @@ func TestRowContainerResetAndAction(t *testing.T) {
rc.actionSpill.WaitForTest()
require.Greater(t, rc.GetDiskTracker().BytesConsumed(), int64(0))
}

func TestPanicWhenSpillToDisk(t *testing.T) {
fields := []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}
sz := 20
chk := NewChunkWithCapacity(fields, sz)
for i := 0; i < sz; i++ {
chk.AppendInt64(0, int64(i))
}

rc := NewRowContainer(fields, sz)
tracker := rc.GetMemTracker()
tracker.SetBytesLimit(chk.MemoryUsage() + 1)
tracker.FallbackOldAndSetNewAction(rc.ActionSpillForTest())
require.False(t, rc.AlreadySpilledSafeForTest())

require.NoError(t, rc.Add(chk))
rc.actionSpill.WaitForTest()
require.False(t, rc.AlreadySpilledSafeForTest())

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/chunk/spillToDiskOutOfDiskQuota", "return(true)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/chunk/spillToDiskOutOfDiskQuota"))
}()
require.NoError(t, rc.Add(chk))
rc.actionSpill.WaitForTest()
require.True(t, rc.AlreadySpilledSafeForTest())

_, err := rc.GetRow(RowPtr{})
require.EqualError(t, err, "out of disk quota when spilling")
require.EqualError(t, rc.Add(chk), "out of disk quota when spilling")
}

func TestPanicDuringSortedRowContainerSpill(t *testing.T) {
fields := []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}
byItemsDesc := []bool{false}
keyColumns := []int{0}
keyCmpFuncs := []CompareFunc{cmpInt64}
sz := 20
rc := NewSortedRowContainer(fields, sz, byItemsDesc, keyColumns, keyCmpFuncs)

chk := NewChunkWithCapacity(fields, sz)
for i := 0; i < sz; i++ {
chk.AppendInt64(0, int64(i))
}
var tracker *memory.Tracker
var err error
tracker = rc.GetMemTracker()
tracker.SetBytesLimit(chk.MemoryUsage() + int64(8*chk.NumRows()) + 1)
tracker.FallbackOldAndSetNewAction(rc.ActionSpillForTest())
require.False(t, rc.AlreadySpilledSafeForTest())
err = rc.Add(chk)
require.NoError(t, err)
rc.actionSpill.WaitForTest()
require.False(t, rc.AlreadySpilledSafeForTest())

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/chunk/errorDuringSortRowContainer", "return(true)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/chunk/errorDuringSortRowContainer"))
}()
err = rc.Add(chk)
require.NoError(t, err)
rc.actionSpill.WaitForTest()
require.True(t, rc.AlreadySpilledSafeForTest())

_, err = rc.GetRow(RowPtr{})
require.EqualError(t, err, "sort meet error")
}

0 comments on commit a099a2a

Please sign in to comment.