-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
util: add PreAlloc4Row and Insert for Chunk and List #7916
Changes from 12 commits
af650d0
577f676
837a517
6eb77ad
3b239b5
f91ebc0
79e6eb0
52497c0
b73c2e9
0ac6476
f5e4088
af8fc82
e59f8d9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,7 @@ package chunk | |
|
||
import ( | ||
"encoding/binary" | ||
"reflect" | ||
"unsafe" | ||
|
||
"github.com/cznic/mathutil" | ||
|
@@ -277,6 +278,74 @@ func (c *Chunk) AppendPartialRow(colIdx int, row Row) { | |
} | ||
} | ||
|
||
// PreAlloc pre-allocates the memory space in a Chunk to store the Row. | ||
// NOTE: | ||
// 1. The Chunk must be empty or holds no useful data. | ||
// 2. The schema of the Row must be the same with the Chunk. | ||
// 3. This API is paired with the `Insert()` function, which inserts all the | ||
// rows data into the Chunk after the pre-allocation. | ||
// 4. We set the null bitmap here instead of in the Insert() function because | ||
// when the Insert() function is called parallelly, the data race on a byte | ||
// can not be avoided although the manipulated bits are different inside a | ||
// byte. | ||
func (c *Chunk) PreAlloc(row Row) { | ||
for i, srcCol := range row.c.columns { | ||
dstCol := c.columns[i] | ||
dstCol.appendNullBitmap(!srcCol.isNull(row.idx)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's better to only pre-allocate the memory for null bitmap by calling There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We write the nullBitMap info here is because that, |
||
elemLen := len(srcCol.elemBuf) | ||
if !srcCol.isFixed() { | ||
elemLen = int(srcCol.offsets[row.idx+1] - srcCol.offsets[row.idx]) | ||
dstCol.offsets = append(dstCol.offsets, int32(len(dstCol.data)+elemLen)) | ||
} | ||
dstCol.length++ | ||
needCap := len(dstCol.data) + elemLen | ||
alivxxx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if needCap <= cap(dstCol.data) { | ||
(*reflect.SliceHeader)(unsafe.Pointer(&dstCol.data)).Len = len(dstCol.data) + elemLen | ||
continue | ||
} | ||
// Grow the capacity according to golang.growslice. | ||
newCap := cap(dstCol.data) | ||
doubleCap := newCap << 1 | ||
if needCap > doubleCap { | ||
newCap = needCap | ||
} else { | ||
if len(dstCol.data) < 1024 { | ||
newCap = doubleCap | ||
} else { | ||
for 0 < newCap && newCap < needCap { | ||
newCap += newCap / 4 | ||
} | ||
if newCap <= 0 { | ||
newCap = needCap | ||
} | ||
} | ||
} | ||
dstCol.data = make([]byte, len(dstCol.data)+elemLen, newCap) | ||
} | ||
} | ||
|
||
// Insert inserts `row` on the position specified by `rowIdx`. | ||
// Note: Insert will cover the origin data, it should be called after | ||
// PreAlloc. | ||
func (c *Chunk) Insert(rowIdx int, row Row) { | ||
for i, srcCol := range row.c.columns { | ||
if row.IsNull(i) { | ||
continue | ||
} | ||
dstCol := c.columns[i] | ||
var srcStart, srcEnd, destStart, destEnd int | ||
if srcCol.isFixed() { | ||
srcElemLen, destElemLen := len(srcCol.elemBuf), len(dstCol.elemBuf) | ||
alivxxx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
srcStart, destStart = row.idx*srcElemLen, rowIdx*destElemLen | ||
srcEnd, destEnd = srcStart+srcElemLen, destStart+destElemLen | ||
alivxxx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} else { | ||
srcStart, srcEnd = int(srcCol.offsets[row.idx]), int(srcCol.offsets[row.idx+1]) | ||
destStart, destEnd = int(dstCol.offsets[rowIdx]), int(dstCol.offsets[rowIdx+1]) | ||
} | ||
copy(dstCol.data[destStart:destEnd], srcCol.data[srcStart:srcEnd]) | ||
} | ||
} | ||
|
||
// Append appends rows in [begin, end) in another Chunk to a Chunk. | ||
func (c *Chunk) Append(other *Chunk, begin, end int) { | ||
for colID, src := range other.columns { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,8 @@ import ( | |
"fmt" | ||
"math" | ||
"strconv" | ||
"strings" | ||
"sync" | ||
"testing" | ||
"time" | ||
"unsafe" | ||
|
@@ -517,6 +519,107 @@ func (s *testChunkSuite) TestSwapColumn(c *check.C) { | |
checkRef() | ||
} | ||
|
||
func (s *testChunkSuite) TestPreAlloc4RowAndInsert(c *check.C) { | ||
fieldTypes := make([]*types.FieldType, 0, 4) | ||
fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeFloat}) | ||
fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeLonglong}) | ||
fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeNewDecimal}) | ||
fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeVarchar}) | ||
|
||
srcChk := NewChunkWithCapacity(fieldTypes, 10) | ||
for i := int64(0); i < 10; i++ { | ||
srcChk.AppendFloat32(0, float32(i)) | ||
srcChk.AppendInt64(1, i) | ||
srcChk.AppendMyDecimal(2, types.NewDecFromInt(i)) | ||
srcChk.AppendString(3, strings.Repeat(strconv.FormatInt(i, 10), int(i))) | ||
} | ||
|
||
destChk := NewChunkWithCapacity(fieldTypes, 3) | ||
|
||
// Test Chunk.PreAlloc. | ||
for i := 0; i < srcChk.NumRows(); i++ { | ||
c.Assert(destChk.NumRows(), check.Equals, i) | ||
destChk.PreAlloc(srcChk.GetRow(i)) | ||
} | ||
for i, srcCol := range srcChk.columns { | ||
destCol := destChk.columns[i] | ||
c.Assert(len(srcCol.elemBuf), check.Equals, len(destCol.elemBuf)) | ||
c.Assert(len(srcCol.data), check.Equals, len(destCol.data)) | ||
c.Assert(len(srcCol.offsets), check.Equals, len(destCol.offsets)) | ||
c.Assert(len(srcCol.nullBitmap), check.Equals, len(destCol.nullBitmap)) | ||
c.Assert(srcCol.length, check.Equals, destCol.length) | ||
c.Assert(srcCol.nullCount, check.Equals, destCol.nullCount) | ||
|
||
for _, val := range destCol.data { | ||
c.Assert(val == 0, check.IsTrue) | ||
} | ||
for j, val := range srcCol.offsets { | ||
c.Assert(val, check.Equals, destCol.offsets[j]) | ||
} | ||
for j, val := range srcCol.nullBitmap { | ||
c.Assert(val, check.Equals, destCol.nullBitmap[j]) | ||
} | ||
for _, val := range destCol.elemBuf { | ||
c.Assert(val == 0, check.IsTrue) | ||
} | ||
} | ||
|
||
// Test Chunk.Insert. | ||
for i := srcChk.NumRows() - 1; i >= 0; i-- { | ||
destChk.Insert(i, srcChk.GetRow(i)) | ||
} | ||
for i, srcCol := range srcChk.columns { | ||
destCol := destChk.columns[i] | ||
|
||
for j, val := range srcCol.data { | ||
c.Assert(val, check.Equals, destCol.data[j]) | ||
} | ||
for j, val := range srcCol.offsets { | ||
c.Assert(val, check.Equals, destCol.offsets[j]) | ||
} | ||
for j, val := range srcCol.nullBitmap { | ||
c.Assert(val, check.Equals, destCol.nullBitmap[j]) | ||
} | ||
for _, val := range destCol.elemBuf { | ||
c.Assert(val == 0, check.IsTrue) | ||
} | ||
} | ||
|
||
// Test parallel Chunk.Insert. | ||
destChk.Reset() | ||
startWg, endWg := &sync.WaitGroup{}, &sync.WaitGroup{} | ||
startWg.Add(1) | ||
for i := 0; i < srcChk.NumRows(); i++ { | ||
destChk.PreAlloc(srcChk.GetRow(i)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. emm..I have a question too, there are any potential or direct There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just a question not review... I read too slow 🤣 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @lysu Direct |
||
endWg.Add(1) | ||
go func(rowIdx int) { | ||
defer func() { | ||
endWg.Done() | ||
}() | ||
startWg.Wait() | ||
destChk.Insert(rowIdx, srcChk.GetRow(rowIdx)) | ||
}(i) | ||
} | ||
startWg.Done() | ||
endWg.Wait() | ||
for i, srcCol := range srcChk.columns { | ||
destCol := destChk.columns[i] | ||
|
||
for j, val := range srcCol.data { | ||
c.Assert(val, check.Equals, destCol.data[j]) | ||
} | ||
for j, val := range srcCol.offsets { | ||
c.Assert(val, check.Equals, destCol.offsets[j]) | ||
} | ||
for j, val := range srcCol.nullBitmap { | ||
c.Assert(val, check.Equals, destCol.nullBitmap[j]) | ||
} | ||
for _, val := range destCol.elemBuf { | ||
c.Assert(val == 0, check.IsTrue) | ||
} | ||
} | ||
} | ||
|
||
func BenchmarkAppendInt(b *testing.B) { | ||
b.ReportAllocs() | ||
chk := newChunk(8) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -140,6 +140,37 @@ func (l *List) Reset() { | |
l.consumedIdx = -1 | ||
} | ||
|
||
// PreAlloc4Row pre-allocates the storage memory for a Row. | ||
// NOTE: | ||
// 1. The List must be empty or holds no useful data. | ||
// 2. The schema of the Row must be the same with the List. | ||
// 3. This API is paired with the `Insert()` function, which inserts all the | ||
// rows data into the List after the pre-allocation. | ||
func (l *List) PreAlloc4Row(row Row) (ptr RowPtr) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the comment of this function should also be updated. |
||
chkIdx := len(l.chunks) - 1 | ||
if chkIdx == -1 || l.chunks[chkIdx].NumRows() >= l.chunks[chkIdx].Capacity() { | ||
newChk := l.allocChunk() | ||
l.chunks = append(l.chunks, newChk) | ||
if chkIdx != l.consumedIdx { | ||
l.memTracker.Consume(l.chunks[chkIdx].MemoryUsage()) | ||
l.consumedIdx = chkIdx | ||
} | ||
chkIdx++ | ||
} | ||
chk := l.chunks[chkIdx] | ||
rowIdx := chk.NumRows() | ||
chk.PreAlloc(row) | ||
l.length++ | ||
return RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)} | ||
} | ||
|
||
// Insert inserts `row` on the position specified by `ptr`. | ||
// Note: Insert will cover the origin data, it should be called after | ||
// PreAlloc. | ||
func (l *List) Insert(ptr RowPtr, row Row) { | ||
l.chunks[ptr.ChkIdx].Insert(int(ptr.RowIdx), row) | ||
} | ||
|
||
// ListWalkFunc is used to walk the list. | ||
// If error is returned, it will stop walking. | ||
type ListWalkFunc = func(row Row) error | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about adding another NOTE in the comment: