Skip to content

Commit

Permalink
util: add PreAlloc4Row and Insert for Chunk and List (#7916)
Browse files Browse the repository at this point in the history
  • Loading branch information
XuHuaiyu authored and zz-jason committed Oct 18, 2018
1 parent bfc12cd commit 562b917
Show file tree
Hide file tree
Showing 4 changed files with 246 additions and 0 deletions.
69 changes: 69 additions & 0 deletions util/chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package chunk

import (
"encoding/binary"
"reflect"
"unsafe"

"github.com/cznic/mathutil"
Expand Down Expand Up @@ -277,6 +278,74 @@ func (c *Chunk) AppendPartialRow(colIdx int, row Row) {
}
}

// PreAlloc pre-allocates the memory space in a Chunk to store the Row.
// NOTE:
// 1. The Chunk must be empty or holds no useful data.
// 2. The schema of the Row must be the same with the Chunk.
// 3. This API is paired with the `Insert()` function, which inserts all the
// rows data into the Chunk after the pre-allocation.
// 4. We set the null bitmap here instead of in the Insert() function because
// when the Insert() function is called parallelly, the data race on a byte
// can not be avoided although the manipulated bits are different inside a
// byte.
func (c *Chunk) PreAlloc(row Row) {
for i, srcCol := range row.c.columns {
dstCol := c.columns[i]
dstCol.appendNullBitmap(!srcCol.isNull(row.idx))
elemLen := len(srcCol.elemBuf)
if !srcCol.isFixed() {
elemLen = int(srcCol.offsets[row.idx+1] - srcCol.offsets[row.idx])
dstCol.offsets = append(dstCol.offsets, int32(len(dstCol.data)+elemLen))
}
dstCol.length++
needCap := len(dstCol.data) + elemLen
if needCap <= cap(dstCol.data) {
(*reflect.SliceHeader)(unsafe.Pointer(&dstCol.data)).Len = len(dstCol.data) + elemLen
continue
}
// Grow the capacity according to golang.growslice.
newCap := cap(dstCol.data)
doubleCap := newCap << 1
if needCap > doubleCap {
newCap = needCap
} else {
if len(dstCol.data) < 1024 {
newCap = doubleCap
} else {
for 0 < newCap && newCap < needCap {
newCap += newCap / 4
}
if newCap <= 0 {
newCap = needCap
}
}
}
dstCol.data = make([]byte, len(dstCol.data)+elemLen, newCap)
}
}

// Insert inserts `row` on the position specified by `rowIdx`.
// Note: Insert will cover the origin data, it should be called after
// PreAlloc.
func (c *Chunk) Insert(rowIdx int, row Row) {
for i, srcCol := range row.c.columns {
if row.IsNull(i) {
continue
}
dstCol := c.columns[i]
var srcStart, srcEnd, destStart, destEnd int
if srcCol.isFixed() {
srcElemLen, destElemLen := len(srcCol.elemBuf), len(dstCol.elemBuf)
srcStart, destStart = row.idx*srcElemLen, rowIdx*destElemLen
srcEnd, destEnd = srcStart+srcElemLen, destStart+destElemLen
} else {
srcStart, srcEnd = int(srcCol.offsets[row.idx]), int(srcCol.offsets[row.idx+1])
destStart, destEnd = int(dstCol.offsets[rowIdx]), int(dstCol.offsets[rowIdx+1])
}
copy(dstCol.data[destStart:destEnd], srcCol.data[srcStart:srcEnd])
}
}

// Append appends rows in [begin, end) in another Chunk to a Chunk.
func (c *Chunk) Append(other *Chunk, begin, end int) {
for colID, src := range other.columns {
Expand Down
103 changes: 103 additions & 0 deletions util/chunk/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"fmt"
"math"
"strconv"
"strings"
"sync"
"testing"
"time"
"unsafe"
Expand Down Expand Up @@ -517,6 +519,107 @@ func (s *testChunkSuite) TestSwapColumn(c *check.C) {
checkRef()
}

func (s *testChunkSuite) TestPreAlloc4RowAndInsert(c *check.C) {
fieldTypes := make([]*types.FieldType, 0, 4)
fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeFloat})
fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeLonglong})
fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeNewDecimal})
fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeVarchar})

srcChk := NewChunkWithCapacity(fieldTypes, 10)
for i := int64(0); i < 10; i++ {
srcChk.AppendFloat32(0, float32(i))
srcChk.AppendInt64(1, i)
srcChk.AppendMyDecimal(2, types.NewDecFromInt(i))
srcChk.AppendString(3, strings.Repeat(strconv.FormatInt(i, 10), int(i)))
}

destChk := NewChunkWithCapacity(fieldTypes, 3)

// Test Chunk.PreAlloc.
for i := 0; i < srcChk.NumRows(); i++ {
c.Assert(destChk.NumRows(), check.Equals, i)
destChk.PreAlloc(srcChk.GetRow(i))
}
for i, srcCol := range srcChk.columns {
destCol := destChk.columns[i]
c.Assert(len(srcCol.elemBuf), check.Equals, len(destCol.elemBuf))
c.Assert(len(srcCol.data), check.Equals, len(destCol.data))
c.Assert(len(srcCol.offsets), check.Equals, len(destCol.offsets))
c.Assert(len(srcCol.nullBitmap), check.Equals, len(destCol.nullBitmap))
c.Assert(srcCol.length, check.Equals, destCol.length)
c.Assert(srcCol.nullCount, check.Equals, destCol.nullCount)

for _, val := range destCol.data {
c.Assert(val == 0, check.IsTrue)
}
for j, val := range srcCol.offsets {
c.Assert(val, check.Equals, destCol.offsets[j])
}
for j, val := range srcCol.nullBitmap {
c.Assert(val, check.Equals, destCol.nullBitmap[j])
}
for _, val := range destCol.elemBuf {
c.Assert(val == 0, check.IsTrue)
}
}

// Test Chunk.Insert.
for i := srcChk.NumRows() - 1; i >= 0; i-- {
destChk.Insert(i, srcChk.GetRow(i))
}
for i, srcCol := range srcChk.columns {
destCol := destChk.columns[i]

for j, val := range srcCol.data {
c.Assert(val, check.Equals, destCol.data[j])
}
for j, val := range srcCol.offsets {
c.Assert(val, check.Equals, destCol.offsets[j])
}
for j, val := range srcCol.nullBitmap {
c.Assert(val, check.Equals, destCol.nullBitmap[j])
}
for _, val := range destCol.elemBuf {
c.Assert(val == 0, check.IsTrue)
}
}

// Test parallel Chunk.Insert.
destChk.Reset()
startWg, endWg := &sync.WaitGroup{}, &sync.WaitGroup{}
startWg.Add(1)
for i := 0; i < srcChk.NumRows(); i++ {
destChk.PreAlloc(srcChk.GetRow(i))
endWg.Add(1)
go func(rowIdx int) {
defer func() {
endWg.Done()
}()
startWg.Wait()
destChk.Insert(rowIdx, srcChk.GetRow(rowIdx))
}(i)
}
startWg.Done()
endWg.Wait()
for i, srcCol := range srcChk.columns {
destCol := destChk.columns[i]

for j, val := range srcCol.data {
c.Assert(val, check.Equals, destCol.data[j])
}
for j, val := range srcCol.offsets {
c.Assert(val, check.Equals, destCol.offsets[j])
}
for j, val := range srcCol.nullBitmap {
c.Assert(val, check.Equals, destCol.nullBitmap[j])
}
for _, val := range destCol.elemBuf {
c.Assert(val == 0, check.IsTrue)
}
}
}

func BenchmarkAppendInt(b *testing.B) {
b.ReportAllocs()
chk := newChunk(8)
Expand Down
31 changes: 31 additions & 0 deletions util/chunk/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,37 @@ func (l *List) Reset() {
l.consumedIdx = -1
}

// PreAlloc4Row pre-allocates the storage memory for a Row.
// NOTE:
// 1. The List must be empty or holds no useful data.
// 2. The schema of the Row must be the same with the List.
// 3. This API is paired with the `Insert()` function, which inserts all the
// rows data into the List after the pre-allocation.
func (l *List) PreAlloc4Row(row Row) (ptr RowPtr) {
chkIdx := len(l.chunks) - 1
if chkIdx == -1 || l.chunks[chkIdx].NumRows() >= l.chunks[chkIdx].Capacity() {
newChk := l.allocChunk()
l.chunks = append(l.chunks, newChk)
if chkIdx != l.consumedIdx {
l.memTracker.Consume(l.chunks[chkIdx].MemoryUsage())
l.consumedIdx = chkIdx
}
chkIdx++
}
chk := l.chunks[chkIdx]
rowIdx := chk.NumRows()
chk.PreAlloc(row)
l.length++
return RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)}
}

// Insert inserts `row` on the position specified by `ptr`.
// Note: Insert will cover the origin data, it should be called after
// PreAlloc.
func (l *List) Insert(ptr RowPtr, row Row) {
l.chunks[ptr.ChkIdx].Insert(int(ptr.RowIdx), row)
}

// ListWalkFunc is used to walk the list.
// If error is returned, it will stop walking.
type ListWalkFunc = func(row Row) error
Expand Down
43 changes: 43 additions & 0 deletions util/chunk/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ package chunk

import (
"math"
"strconv"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -114,6 +116,47 @@ func (s *testChunkSuite) TestListMemoryUsage(c *check.C) {
c.Assert(list.GetMemTracker().BytesConsumed(), check.Equals, memUsage+srcChk.MemoryUsage())
}

func (s *testChunkSuite) TestListPrePreAlloc4RowAndInsert(c *check.C) {
fieldTypes := make([]*types.FieldType, 0, 4)
fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeFloat})
fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeLonglong})
fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeNewDecimal})
fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeVarchar})

srcChk := NewChunkWithCapacity(fieldTypes, 10)
for i := int64(0); i < 10; i++ {
srcChk.AppendFloat32(0, float32(i))
srcChk.AppendInt64(1, i)
srcChk.AppendMyDecimal(2, types.NewDecFromInt(i))
srcChk.AppendString(3, strings.Repeat(strconv.FormatInt(i, 10), int(i)))
}

srcList := NewList(fieldTypes, 3, 3)
destList := NewList(fieldTypes, 5, 5)
destRowPtr := make([]RowPtr, srcChk.NumRows())
for i := 0; i < srcChk.NumRows(); i++ {
srcList.AppendRow(srcChk.GetRow(i))
destRowPtr[i] = destList.PreAlloc4Row(srcChk.GetRow(i))
}

c.Assert(srcList.NumChunks(), check.Equals, 4)
c.Assert(destList.NumChunks(), check.Equals, 2)

iter4Src := NewIterator4List(srcList)
for row, i := iter4Src.Begin(), 0; row != iter4Src.End(); row, i = iter4Src.Next(), i+1 {
destList.Insert(destRowPtr[i], row)
}

iter4Dest := NewIterator4List(destList)
srcRow, destRow := iter4Src.Begin(), iter4Dest.Begin()
for ; srcRow != iter4Src.End(); srcRow, destRow = iter4Src.Next(), iter4Dest.Next() {
c.Assert(srcRow.GetFloat32(0), check.Equals, destRow.GetFloat32(0))
c.Assert(srcRow.GetInt64(1), check.Equals, destRow.GetInt64(1))
c.Assert(srcRow.GetMyDecimal(2).Compare(destRow.GetMyDecimal(2)) == 0, check.IsTrue)
c.Assert(srcRow.GetString(3), check.Equals, destRow.GetString(3))
}
}

func BenchmarkListMemoryUsage(b *testing.B) {
fieldTypes := make([]*types.FieldType, 0, 4)
fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeFloat})
Expand Down

0 comments on commit 562b917

Please sign in to comment.