Skip to content

Commit

Permalink
cherry pick pingcap#32554 to release-5.3
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
wshwsh12 authored and ti-srebot committed Feb 23, 2022
1 parent d82ee69 commit e06bc52
Show file tree
Hide file tree
Showing 5 changed files with 377 additions and 0 deletions.
10 changes: 10 additions & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1014,6 +1014,11 @@ func createSessionFunc(store kv.Storage) pools.Factory {
}
se.sessionVars.CommonGlobalLoaded = true
se.sessionVars.InRestrictedSQL = true
<<<<<<< HEAD
=======
// Internal session uses default format to prevent memory leak problem.
se.sessionVars.EnableChunkRPC = false
>>>>>>> cce1ebdeb... util: avoid column allocator reuse the column hold huge memory (#32554)
return se, nil
}
}
Expand All @@ -1034,6 +1039,11 @@ func createSessionWithDomainFunc(store kv.Storage) func(*domain.Domain) (pools.R
}
se.sessionVars.CommonGlobalLoaded = true
se.sessionVars.InRestrictedSQL = true
<<<<<<< HEAD
=======
// Internal session uses default format to prevent memory leak problem.
se.sessionVars.EnableChunkRPC = false
>>>>>>> cce1ebdeb... util: avoid column allocator reuse the column hold huge memory (#32554)
return se, nil
}
}
Expand Down
158 changes: 158 additions & 0 deletions util/chunk/alloc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package chunk

import (
"github.com/cznic/mathutil"
"github.com/pingcap/tidb/types"
)

// Allocator is an interface defined to reduce object allocation.
// The typical usage is to call Reset() to recycle objects into a pool,
// and Alloc() allocates from the pool.
type Allocator interface {
Alloc(fields []*types.FieldType, capacity, maxChunkSize int) *Chunk
Reset()
}

// NewAllocator creates an Allocator.
func NewAllocator() *allocator {
ret := &allocator{}
ret.columnAlloc.init()
return ret
}

var _ Allocator = &allocator{}

// allocator try to reuse objects.
// It uses `poolColumnAllocator` to alloc chunk column objects.
// The allocated chunks are recorded in the `allocated` array.
// After Reset(), those chunks are decoupled into chunk column objects and get
// into `poolColumnAllocator` again for reuse.
type allocator struct {
allocated []*Chunk
free []*Chunk
columnAlloc poolColumnAllocator
}

// Alloc implements the Allocator interface.
func (a *allocator) Alloc(fields []*types.FieldType, capacity, maxChunkSize int) *Chunk {
var chk *Chunk
// Try to alloc from the free list.
if len(a.free) > 0 {
chk = a.free[len(a.free)-1]
a.free = a.free[:len(a.free)-1]
} else {
chk = &Chunk{columns: make([]*Column, 0, len(fields))}
}

// Init the chunk fields.
chk.capacity = mathutil.Min(capacity, maxChunkSize)
chk.requiredRows = maxChunkSize
// Allocate the chunk columns from the pool column allocator.
for _, f := range fields {
chk.columns = append(chk.columns, a.columnAlloc.NewColumn(f, chk.capacity))
}

a.allocated = append(a.allocated, chk)
return chk
}

const (
maxFreeChunks = 64
maxFreeColumnsPerType = 256
)

// Reset implements the Allocator interface.
func (a *allocator) Reset() {
a.free = a.free[:0]
for i, chk := range a.allocated {
a.allocated[i] = nil
// Decouple chunk into chunk column objects and put them back to the column allocator for reuse.
for _, col := range chk.columns {
a.columnAlloc.put(col)
}
// Reset the chunk and put it to the free list for reuse.
chk.resetForReuse()

if len(a.free) < maxFreeChunks { // Don't cache too much data.
a.free = append(a.free, chk)
}
}
a.allocated = a.allocated[:0]
}

var _ ColumnAllocator = &poolColumnAllocator{}

type poolColumnAllocator struct {
pool map[int]freeList
}

// poolColumnAllocator implements the ColumnAllocator interface.
func (alloc *poolColumnAllocator) NewColumn(ft *types.FieldType, count int) *Column {
typeSize := getFixedLen(ft)
l := alloc.pool[typeSize]
if l != nil && !l.empty() {
col := l.pop()
return col
}
return newColumn(typeSize, count)
}

func (alloc *poolColumnAllocator) init() {
alloc.pool = make(map[int]freeList)
}

func (alloc *poolColumnAllocator) put(col *Column) {
if col.avoidReusing {
return
}
typeSize := col.typeSize()
if typeSize <= 0 {
return
}

l := alloc.pool[typeSize]
if l == nil {
l = make(map[*Column]struct{}, 8)
alloc.pool[typeSize] = l
}
l.push(col)
}

// freeList is defined as a map, rather than a list, because when recycling chunk
// columns, there could be duplicated one: some of the chunk columns are just the
// reference to the others.
type freeList map[*Column]struct{}

func (l freeList) empty() bool {
return len(l) == 0
}

func (l freeList) pop() *Column {
for k := range l {
delete(l, k)
return k
}
return nil
}

func (l freeList) push(c *Column) {
if len(l) >= maxFreeColumnsPerType {
// Don't cache too much to save memory.
return
}
l[c] = struct{}{}
}
204 changes: 204 additions & 0 deletions util/chunk/alloc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package chunk

import (
"testing"

"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/types"
"github.com/stretchr/testify/require"
)

func TestAllocator(t *testing.T) {
alloc := NewAllocator()

fieldTypes := []*types.FieldType{
{Tp: mysql.TypeVarchar},
{Tp: mysql.TypeJSON},
{Tp: mysql.TypeFloat},
{Tp: mysql.TypeNewDecimal},
{Tp: mysql.TypeDouble},
{Tp: mysql.TypeLonglong},
{Tp: mysql.TypeTimestamp},
{Tp: mysql.TypeDatetime},
}

initCap := 5
maxChunkSize := 100

chk := alloc.Alloc(fieldTypes, initCap, maxChunkSize)
require.NotNil(t, chk)
check := func() {
require.Equal(t, len(fieldTypes), chk.NumCols())
require.Nil(t, chk.columns[0].elemBuf)
require.Nil(t, chk.columns[1].elemBuf)
require.Equal(t, getFixedLen(fieldTypes[2]), len(chk.columns[2].elemBuf))
require.Equal(t, getFixedLen(fieldTypes[3]), len(chk.columns[3].elemBuf))
require.Equal(t, getFixedLen(fieldTypes[4]), len(chk.columns[4].elemBuf))
require.Equal(t, getFixedLen(fieldTypes[5]), len(chk.columns[5].elemBuf))
require.Equal(t, getFixedLen(fieldTypes[6]), len(chk.columns[6].elemBuf))
require.Equal(t, getFixedLen(fieldTypes[7]), len(chk.columns[7].elemBuf))

require.Equal(t, initCap*getFixedLen(fieldTypes[2]), cap(chk.columns[2].data))
require.Equal(t, initCap*getFixedLen(fieldTypes[3]), cap(chk.columns[3].data))
require.Equal(t, initCap*getFixedLen(fieldTypes[4]), cap(chk.columns[4].data))
require.Equal(t, initCap*getFixedLen(fieldTypes[5]), cap(chk.columns[5].data))
require.Equal(t, initCap*getFixedLen(fieldTypes[6]), cap(chk.columns[6].data))
require.Equal(t, initCap*getFixedLen(fieldTypes[7]), cap(chk.columns[7].data))
}
check()

// Call Reset and alloc again, check the result.
alloc.Reset()
chk = alloc.Alloc(fieldTypes, initCap, maxChunkSize)
check()

// Check maxFreeListLen
for i := 0; i < maxFreeChunks+10; i++ {
alloc.Alloc(fieldTypes, initCap, maxChunkSize)
}
alloc.Reset()
require.Equal(t, len(alloc.free), maxFreeChunks)
}

func TestColumnAllocator(t *testing.T) {
fieldTypes := []*types.FieldType{
{Tp: mysql.TypeVarchar},
{Tp: mysql.TypeJSON},
{Tp: mysql.TypeFloat},
{Tp: mysql.TypeNewDecimal},
{Tp: mysql.TypeDouble},
{Tp: mysql.TypeLonglong},
{Tp: mysql.TypeTimestamp},
{Tp: mysql.TypeDatetime},
}

var alloc1 poolColumnAllocator
alloc1.init()
var alloc2 DefaultColumnAllocator

// Test the basic allocate operation.
initCap := 5
for _, ft := range fieldTypes {
v0 := NewColumn(ft, initCap)
v1 := alloc1.NewColumn(ft, initCap)
v2 := alloc2.NewColumn(ft, initCap)
require.Equal(t, v0, v1)
require.Equal(t, v1, v2)
}

ft := fieldTypes[2]
// Test reuse.
cols := make([]*Column, 0, maxFreeColumnsPerType+10)
for i := 0; i < maxFreeColumnsPerType+10; i++ {
col := alloc1.NewColumn(ft, 20)
cols = append(cols, col)
}
for _, col := range cols {
alloc1.put(col)
}

// Check max column size.
freeList := alloc1.pool[getFixedLen(ft)]
require.NotNil(t, freeList)
require.Len(t, freeList, maxFreeColumnsPerType)
}

func TestNoDuplicateColumnReuse(t *testing.T) {
// For issue https://github.com/pingcap/tidb/issues/29554
// Some chunk columns are just references to other chunk columns.
// So when reusing Chunk, some columns may point to the same memory address.

fieldTypes := []*types.FieldType{
{Tp: mysql.TypeVarchar},
{Tp: mysql.TypeJSON},
{Tp: mysql.TypeFloat},
{Tp: mysql.TypeNewDecimal},
{Tp: mysql.TypeDouble},
{Tp: mysql.TypeLonglong},
{Tp: mysql.TypeTimestamp},
{Tp: mysql.TypeDatetime},
}
alloc := NewAllocator()
for i := 0; i < maxFreeChunks+10; i++ {
chk := alloc.Alloc(fieldTypes, 5, 10)
chk.MakeRef(1, 3)
}
alloc.Reset()

a := alloc.columnAlloc
// Make sure no duplicated column in the pool.
for _, p := range a.pool {
dup := make(map[*Column]struct{})
for !p.empty() {
c := p.pop()
_, exist := dup[c]
require.False(t, exist)
dup[c] = struct{}{}
}
}
}

func TestAvoidColumnReuse(t *testing.T) {
// For issue: https://github.com/pingcap/tidb/issues/31981
// Some chunk columns are references to rpc message.
// So when reusing Chunk, we should ignore them.

fieldTypes := []*types.FieldType{
{Tp: mysql.TypeVarchar},
{Tp: mysql.TypeJSON},
{Tp: mysql.TypeFloat},
{Tp: mysql.TypeNewDecimal},
{Tp: mysql.TypeDouble},
{Tp: mysql.TypeLonglong},
{Tp: mysql.TypeTimestamp},
{Tp: mysql.TypeDatetime},
}
alloc := NewAllocator()
for i := 0; i < maxFreeChunks+10; i++ {
chk := alloc.Alloc(fieldTypes, 5, 10)
for _, col := range chk.columns {
col.avoidReusing = true
}
}
alloc.Reset()

a := alloc.columnAlloc
// Make sure no duplicated column in the pool.
for _, p := range a.pool {
require.True(t, p.empty())
}

// test decoder will set avoid reusing flag.
chk := alloc.Alloc(fieldTypes, 5, 1024)
for i := 0; i <= 10; i++ {
for _, col := range chk.columns {
col.AppendNull()
}
}
codec := &Codec{fieldTypes}
buf := codec.Encode(chk)

decoder := NewDecoder(
NewChunkWithCapacity(fieldTypes, 0),
fieldTypes,
)
decoder.Reset(buf)
decoder.ReuseIntermChk(chk)
for _, col := range chk.columns {
require.True(t, col.avoidReusing)
}
}
3 changes: 3 additions & 0 deletions util/chunk/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ func (c *Codec) decodeColumn(buffer []byte, col *Column, ordinal int) (remained

// decode data.
col.data = buffer[:numDataBytes:numDataBytes]
// The column reference the data of the grpc response, the memory of the grpc message cannot be GCed if we reuse
// this column. Thus, we set `avoidReusing` to true.
col.avoidReusing = true
return buffer[numDataBytes:]
}

Expand Down
Loading

0 comments on commit e06bc52

Please sign in to comment.