Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: enhancement for ListInDisk(support writing after reading) #24379

Merged
merged 16 commits into from
May 19, 2021
25 changes: 19 additions & 6 deletions util/checksum/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ package checksum

import (
"encoding/binary"
"errors"
"hash/crc32"
"io"
"sync"

"github.com/pingcap/errors"
)

const (
Expand All @@ -42,11 +43,12 @@ var checksumReaderBufPool = sync.Pool{
// | -- 4B -- | -- 1020B -- || -- 4B -- | -- 1020B -- || -- 4B -- | -- 60B -- |
// | -- checksum -- | -- payload -- || -- checksum -- | -- payload -- || -- checksum -- | -- payload -- |
type Writer struct {
err error
w io.WriteCloser
buf []byte
payload []byte
payloadUsed int
err error
w io.WriteCloser
buf []byte
payload []byte
payloadUsed int
flushedUserDataCnt int64
guo-shaoge marked this conversation as resolved.
Show resolved Hide resolved
}

// NewWriter returns a new Writer which calculates and stores a CRC-32 checksum for the payload before
Expand Down Expand Up @@ -104,10 +106,21 @@ func (w *Writer) Flush() error {
w.err = err
return err
}
w.flushedUserDataCnt += int64(w.payloadUsed)
w.payloadUsed = 0
return nil
}

// GetCache returns the byte slice that holds the data not flushed to disk.
func (w *Writer) GetCache() []byte {
return w.payload[:w.payloadUsed]
}

// GetCacheDataOffset return the user data offset in cache.
func (w *Writer) GetCacheDataOffset() int64 {
return w.flushedUserDataCnt
}

// Close implements the io.Closer interface.
func (w *Writer) Close() (err error) {
err = w.Flush()
Expand Down
72 changes: 72 additions & 0 deletions util/checksum/checksum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,3 +651,75 @@ func (s *testChecksumSuite) testTiCase3651and3652(c *check.C, encrypt bool) {
assertReadAt(0, make([]byte, 10200), nil, 10200, strings.Repeat("0123456789", 1020), f1)
assertReadAt(0, make([]byte, 10200), nil, 10200, strings.Repeat("0123456789", 1020), f2)
}

var checkFlushedData = func(c *check.C, f io.ReaderAt, off int64, readBufLen int, assertN int, assertErr interface{}, assertRes []byte) {
readBuf := make([]byte, readBufLen)
r := NewReader(f)
n, err := r.ReadAt(readBuf, off)
c.Assert(err, check.Equals, assertErr)
c.Assert(n, check.Equals, assertN)
c.Assert(bytes.Compare(readBuf, assertRes), check.Equals, 0)
}

func (s *testChecksumSuite) TestChecksumWriter(c *check.C) {
path := "checksum"
f, err := os.Create(path)
guo-shaoge marked this conversation as resolved.
Show resolved Hide resolved
c.Assert(err, check.IsNil)
defer func() {
err = f.Close()
c.Assert(err, check.IsNil)
err = os.Remove(path)
c.Assert(err, check.IsNil)
}()

buf := bytes.NewBuffer(nil)
testData := "0123456789"
for i := 0; i < 100; i++ {
buf.WriteString(testData)
}

// write 1000 bytes and flush
w := NewWriter(f)
n, err := w.Write(buf.Bytes())
c.Assert(err, check.IsNil)
c.Assert(n, check.Equals, 1000)

err = w.Flush()
c.Assert(err, check.IsNil)
checkFlushedData(c, f, 0, 1000, 1000, nil, buf.Bytes())

// all data flushed, so no data in cache.
cacheOff := w.GetCacheDataOffset()
c.Assert(cacheOff, check.Equals, int64(1000))
}

func (s *testChecksumSuite) TestChecksumWriterAutoFlush(c *check.C) {
path := "checksum"
f, err := os.Create(path)
c.Assert(err, check.IsNil)
defer func() {
err = f.Close()
c.Assert(err, check.IsNil)
err = os.Remove(path)
c.Assert(err, check.IsNil)
}()

w := NewWriter(f)

buf := bytes.NewBuffer(nil)
testData := "0123456789"
for i := 0; i < 102; i++ {
buf.WriteString(testData)
}
n, err := w.Write(buf.Bytes())
c.Assert(err, check.IsNil)
c.Assert(n, check.Equals, len(buf.Bytes()))

// this write will trigger flush
n, err = w.Write([]byte("0"))
c.Assert(err, check.IsNil)
c.Assert(n, check.Equals, 1)
checkFlushedData(c, f, 0, 1020, 1020, nil, buf.Bytes())
cacheOff := w.GetCacheDataOffset()
c.Assert(cacheOff, check.Equals, int64(len(buf.Bytes())))
}
73 changes: 68 additions & 5 deletions util/chunk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ type ListInDisk struct {
diskTracker *disk.Tracker // track disk usage.
numRowsInDisk int

checksumWriter *checksum.Writer
cipherWriter *encrypt.Writer

// ctrCipher stores the key and nonce using by aes encrypt io layer
ctrCipher *encrypt.CtrCipher
}
Expand Down Expand Up @@ -78,9 +81,11 @@ func (l *ListInDisk) initDiskFile() (err error) {
if err != nil {
return
}
underlying = encrypt.NewWriter(l.disk, l.ctrCipher)
l.cipherWriter = encrypt.NewWriter(l.disk, l.ctrCipher)
underlying = l.cipherWriter
}
l.w = checksum.NewWriter(underlying)
l.checksumWriter = checksum.NewWriter(underlying)
l.w = l.checksumWriter
l.bufFlushMutex = sync.RWMutex{}
return
}
Expand Down Expand Up @@ -164,16 +169,16 @@ func (l *ListInDisk) GetChunk(chkIdx int) (*Chunk, error) {

// GetRow gets a Row from the ListInDisk by RowPtr.
func (l *ListInDisk) GetRow(ptr RowPtr) (row Row, err error) {
err = l.flush()
if err != nil {
return
}
off := l.offsets[ptr.ChkIdx][ptr.RowIdx]
var underlying io.ReaderAt = l.disk
if l.ctrCipher != nil {
underlying = encrypt.NewReader(l.disk, l.ctrCipher)
underlying = NewReaderWithCache(encrypt.NewReader(l.disk, l.ctrCipher), l.cipherWriter.GetCache(), l.cipherWriter.GetCacheDataOffset())
}
r := io.NewSectionReader(checksum.NewReader(underlying), off, l.offWrite-off)
checksumReader := NewReaderWithCache(checksum.NewReader(underlying), l.checksumWriter.GetCache(), l.checksumWriter.GetCacheDataOffset())
r := io.NewSectionReader(checksumReader, off, l.offWrite-off)
guo-shaoge marked this conversation as resolved.
Show resolved Hide resolved
format := rowInDisk{numCol: len(l.fieldTypes)}
_, err = format.ReadFrom(r)
if err != nil {
Expand Down Expand Up @@ -367,3 +372,61 @@ func (format *diskFormatRow) toMutRow(fields []*types.FieldType) MutRow {
}
return MutRow{c: chk}
}

// ReaderWithCache helps to read data that has not be flushed to underlying layer.
// By using ReaderWithCache, user can still write data into ListInDisk even after reading.
type ReaderWithCache struct {
r io.ReaderAt
cacheOff int64
cache []byte
}

// NewReaderWithCache returns a ReaderWithCache
func NewReaderWithCache(r io.ReaderAt, cache []byte, cacheOff int64) *ReaderWithCache {
return &ReaderWithCache{
r: r,
cacheOff: cacheOff,
cache: cache,
}
}

// ReadAt implements the ReadAt interface
func (r *ReaderWithCache) ReadAt(p []byte, off int64) (readCnt int, err error) {
readCnt, err = r.r.ReadAt(p, off)
if err != io.EOF {
return readCnt, err
guo-shaoge marked this conversation as resolved.
Show resolved Hide resolved
}

if len(p) == readCnt {
return readCnt, err
} else if len(p) < readCnt {
return readCnt, errors2.Trace(errors2.Errorf("cannot read more data than user requested"+
"(readCnt: %v, len(p): %v", readCnt, len(p)))
guo-shaoge marked this conversation as resolved.
Show resolved Hide resolved
}

// When got here, user input is not filled fully, so we need read data from cache.
err = nil
if readCnt == 0 {
// readCnt == 0 means all user requested data resides in r.cache
beg := off - r.cacheOff
if beg < 0 {
panic("off must be greater than r.cacheOff when readCnt is 0")
guo-shaoge marked this conversation as resolved.
Show resolved Hide resolved
}
end := int(beg) + len(p)
if end > len(r.cache) {
err = io.EOF
end = len(r.cache)
}
readCnt = copy(p, r.cache[beg:end])
} else {
// readCnt != 0 means only partial data of user requested resides in r.cache
p = p[readCnt:]
end := len(p)
if end > len(r.cache) {
err = io.EOF
end = len(r.cache)
}
readCnt += copy(p, r.cache[:end])
}
return readCnt, err
}
131 changes: 131 additions & 0 deletions util/chunk/disk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
package chunk

import (
"bytes"
"fmt"
"io"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"reflect"
"strconv"
"strings"
"testing"
Expand All @@ -30,6 +32,8 @@ import (
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/types/json"
"github.com/pingcap/tidb/util/checksum"
"github.com/pingcap/tidb/util/encrypt"
)

func initChunks(numChk, numRow int) ([]*Chunk, []*types.FieldType) {
Expand Down Expand Up @@ -219,6 +223,8 @@ func (s *testChunkSuite) TestListInDiskWithChecksum(c *check.C) {
})
testListInDisk(c)

testReaderWithCache(c)
testReaderWithCacheNoFlush(c)
}

func (s *testChunkSuite) TestListInDiskWithChecksumAndEncrypt(c *check.C) {
Expand All @@ -227,4 +233,129 @@ func (s *testChunkSuite) TestListInDiskWithChecksumAndEncrypt(c *check.C) {
conf.Security.SpilledFileEncryptionMethod = config.SpilledFileEncryptionMethodAES128CTR
})
testListInDisk(c)

testReaderWithCache(c)
testReaderWithCacheNoFlush(c)
}

// checksum layer data layout:
// 4 B: checksum of this segment
// 8 B: all columns' length, in the following example, we will only have one column
// 1012 B: data in file. because max length of each segment is 1024, so we only have 1020B for user payload.
//
// Data in File Data in mem cache
// +------+------------------------------------------+ +-----------------------------+
// | | 1020B payload | | |
// |4Bytes| +---------+----------------------------+ | | |
// |checksum|8B collen| 1012B user data | | | 12B remained user data |
// | | +---------+----------------------------+ | | |
// | | | | |
// +------+------------------------------------------+ +-----------------------------+
func testReaderWithCache(c *check.C) {
testData := "0123456789"
buf := bytes.NewBuffer(nil)
for i := 0; i < 102; i++ {
buf.WriteString(testData)
}
buf.WriteString("0123")

field := []*types.FieldType{types.NewFieldType(mysql.TypeString)}
chk := NewChunkWithCapacity(field, 1)
chk.AppendString(0, buf.String())
l := NewListInDisk(field)
err := l.Add(chk)
c.Assert(err, check.IsNil)

// basic test for GetRow()
row, err := l.GetRow(RowPtr{0, 0})
c.Assert(err, check.IsNil)
c.Assert(row.GetDatumRow(field), check.DeepEquals, chk.GetRow(0).GetDatumRow(field))

var underlying io.ReaderAt = l.disk
if l.ctrCipher != nil {
underlying = NewReaderWithCache(encrypt.NewReader(l.disk, l.ctrCipher), l.cipherWriter.GetCache(), l.cipherWriter.GetCacheDataOffset())
}
checksumReader := NewReaderWithCache(checksum.NewReader(underlying), l.checksumWriter.GetCache(), l.checksumWriter.GetCacheDataOffset())

// read all data
data := make([]byte, 1024)
// off is 8, because we want to ignore col length
readCnt, err := checksumReader.ReadAt(data, 8)
c.Assert(err, check.IsNil)
c.Assert(readCnt, check.Equals, 1024)
c.Assert(reflect.DeepEqual(data, buf.Bytes()), check.IsTrue)

// only read data of mem cache
data = make([]byte, 1024)
readCnt, err = checksumReader.ReadAt(data, 1020)
c.Assert(err, check.Equals, io.EOF)
c.Assert(readCnt, check.Equals, 12)
c.Assert(reflect.DeepEqual(data[:12], buf.Bytes()[1012:]), check.IsTrue)

// read partial data of mem cache
data = make([]byte, 1024)
readCnt, err = checksumReader.ReadAt(data, 1025)
c.Assert(err, check.Equals, io.EOF)
c.Assert(readCnt, check.Equals, 7)
c.Assert(reflect.DeepEqual(data[:7], buf.Bytes()[1017:]), check.IsTrue)

// read partial data from both file and mem cache
data = make([]byte, 1024)
readCnt, err = checksumReader.ReadAt(data, 1010)
c.Assert(err, check.Equals, io.EOF)
c.Assert(readCnt, check.Equals, 22)
c.Assert(reflect.DeepEqual(data[:22], buf.Bytes()[1002:]), check.IsTrue)

// off is too large, so no data is read
data = make([]byte, 1024)
readCnt, err = checksumReader.ReadAt(data, 1032)
c.Assert(err, check.Equals, io.EOF)
c.Assert(readCnt, check.Equals, 0)
c.Assert(reflect.DeepEqual(data, make([]byte, 1024)), check.IsTrue)

// only read 1 byte from mem cache
data = make([]byte, 1024)
readCnt, err = checksumReader.ReadAt(data, 1031)
c.Assert(err, check.Equals, io.EOF)
c.Assert(readCnt, check.Equals, 1)
c.Assert(reflect.DeepEqual(data[:1], buf.Bytes()[1023:]), check.IsTrue)

// test user requested data is small
// only request 10 bytes
data = make([]byte, 10)
readCnt, err = checksumReader.ReadAt(data, 1010)
c.Assert(err, check.IsNil)
c.Assert(readCnt, check.Equals, 10)
c.Assert(reflect.DeepEqual(data, buf.Bytes()[1002:1012]), check.IsTrue)
}

// here we test situations where size of data is small, so no data is flushed to disk
func testReaderWithCacheNoFlush(c *check.C) {
testData := "0123456789"

field := []*types.FieldType{types.NewFieldType(mysql.TypeString)}
chk := NewChunkWithCapacity(field, 1)
chk.AppendString(0, testData)
l := NewListInDisk(field)
err := l.Add(chk)
c.Assert(err, check.IsNil)

// basic test for GetRow()
row, err := l.GetRow(RowPtr{0, 0})
c.Assert(err, check.IsNil)
c.Assert(row.GetDatumRow(field), check.DeepEquals, chk.GetRow(0).GetDatumRow(field))

var underlying io.ReaderAt = l.disk
if l.ctrCipher != nil {
underlying = NewReaderWithCache(encrypt.NewReader(l.disk, l.ctrCipher), l.cipherWriter.GetCache(), l.cipherWriter.GetCacheDataOffset())
}
checksumReader := NewReaderWithCache(checksum.NewReader(underlying), l.checksumWriter.GetCache(), l.checksumWriter.GetCacheDataOffset())

// read all data
data := make([]byte, 1024)
// off is 8, because we want to ignore col length
readCnt, err := checksumReader.ReadAt(data, 8)
c.Assert(err, check.Equals, io.EOF)
c.Assert(readCnt, check.Equals, len(testData))
c.Assert(reflect.DeepEqual(data[:10], []byte(testData)), check.IsTrue)
}
Loading