Skip to content

Commit

Permalink
executor: enhancement for ListInDisk(support writing after reading) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
guo-shaoge authored May 19, 2021
1 parent 49d1eaa commit 424a5a8
Show file tree
Hide file tree
Showing 6 changed files with 317 additions and 15 deletions.
22 changes: 17 additions & 5 deletions util/checksum/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,12 @@ var checksumReaderBufPool = sync.Pool{
// | -- 4B -- | -- 1020B -- || -- 4B -- | -- 1020B -- || -- 4B -- | -- 60B -- |
// | -- checksum -- | -- payload -- || -- checksum -- | -- payload -- || -- checksum -- | -- payload -- |
type Writer struct {
err error
w io.WriteCloser
buf []byte
payload []byte
payloadUsed int
err error
w io.WriteCloser
buf []byte
payload []byte
payloadUsed int
flushedUserDataCnt int64
}

// NewWriter returns a new Writer which calculates and stores a CRC-32 checksum for the payload before
Expand Down Expand Up @@ -104,10 +105,21 @@ func (w *Writer) Flush() error {
w.err = err
return err
}
w.flushedUserDataCnt += int64(w.payloadUsed)
w.payloadUsed = 0
return nil
}

// GetCache returns the byte slice that holds the data not flushed to disk.
func (w *Writer) GetCache() []byte {
return w.payload[:w.payloadUsed]
}

// GetCacheDataOffset return the user data offset in cache.
func (w *Writer) GetCacheDataOffset() int64 {
return w.flushedUserDataCnt
}

// Close implements the io.Closer interface.
func (w *Writer) Close() (err error) {
err = w.Flush()
Expand Down
72 changes: 72 additions & 0 deletions util/checksum/checksum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,3 +651,75 @@ func (s *testChecksumSuite) testTiCase3651and3652(c *check.C, encrypt bool) {
assertReadAt(0, make([]byte, 10200), nil, 10200, strings.Repeat("0123456789", 1020), f1)
assertReadAt(0, make([]byte, 10200), nil, 10200, strings.Repeat("0123456789", 1020), f2)
}

var checkFlushedData = func(c *check.C, f io.ReaderAt, off int64, readBufLen int, assertN int, assertErr interface{}, assertRes []byte) {
readBuf := make([]byte, readBufLen)
r := NewReader(f)
n, err := r.ReadAt(readBuf, off)
c.Assert(err, check.Equals, assertErr)
c.Assert(n, check.Equals, assertN)
c.Assert(bytes.Compare(readBuf, assertRes), check.Equals, 0)
}

func (s *testChecksumSuite) TestChecksumWriter(c *check.C) {
path := "checksum_TestChecksumWriter"
f, err := os.Create(path)
c.Assert(err, check.IsNil)
defer func() {
err = f.Close()
c.Assert(err, check.IsNil)
err = os.Remove(path)
c.Assert(err, check.IsNil)
}()

buf := bytes.NewBuffer(nil)
testData := "0123456789"
for i := 0; i < 100; i++ {
buf.WriteString(testData)
}

// Write 1000 bytes and flush.
w := NewWriter(f)
n, err := w.Write(buf.Bytes())
c.Assert(err, check.IsNil)
c.Assert(n, check.Equals, 1000)

err = w.Flush()
c.Assert(err, check.IsNil)
checkFlushedData(c, f, 0, 1000, 1000, nil, buf.Bytes())

// All data flushed, so no data in cache.
cacheOff := w.GetCacheDataOffset()
c.Assert(cacheOff, check.Equals, int64(1000))
}

func (s *testChecksumSuite) TestChecksumWriterAutoFlush(c *check.C) {
path := "checksum_TestChecksumWriterAutoFlush"
f, err := os.Create(path)
c.Assert(err, check.IsNil)
defer func() {
err = f.Close()
c.Assert(err, check.IsNil)
err = os.Remove(path)
c.Assert(err, check.IsNil)
}()

w := NewWriter(f)

buf := bytes.NewBuffer(nil)
testData := "0123456789"
for i := 0; i < 102; i++ {
buf.WriteString(testData)
}
n, err := w.Write(buf.Bytes())
c.Assert(err, check.IsNil)
c.Assert(n, check.Equals, len(buf.Bytes()))

// This write will trigger flush.
n, err = w.Write([]byte("0"))
c.Assert(err, check.IsNil)
c.Assert(n, check.Equals, 1)
checkFlushedData(c, f, 0, 1020, 1020, nil, buf.Bytes())
cacheOff := w.GetCacheDataOffset()
c.Assert(cacheOff, check.Equals, int64(len(buf.Bytes())))
}
63 changes: 58 additions & 5 deletions util/chunk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ type ListInDisk struct {
diskTracker *disk.Tracker // track disk usage.
numRowsInDisk int

checksumWriter *checksum.Writer
cipherWriter *encrypt.Writer

// ctrCipher stores the key and nonce using by aes encrypt io layer
ctrCipher *encrypt.CtrCipher
}
Expand Down Expand Up @@ -78,9 +81,11 @@ func (l *ListInDisk) initDiskFile() (err error) {
if err != nil {
return
}
underlying = encrypt.NewWriter(l.disk, l.ctrCipher)
l.cipherWriter = encrypt.NewWriter(l.disk, l.ctrCipher)
underlying = l.cipherWriter
}
l.w = checksum.NewWriter(underlying)
l.checksumWriter = checksum.NewWriter(underlying)
l.w = l.checksumWriter
l.bufFlushMutex = sync.RWMutex{}
return
}
Expand Down Expand Up @@ -164,16 +169,16 @@ func (l *ListInDisk) GetChunk(chkIdx int) (*Chunk, error) {

// GetRow gets a Row from the ListInDisk by RowPtr.
func (l *ListInDisk) GetRow(ptr RowPtr) (row Row, err error) {
err = l.flush()
if err != nil {
return
}
off := l.offsets[ptr.ChkIdx][ptr.RowIdx]
var underlying io.ReaderAt = l.disk
if l.ctrCipher != nil {
underlying = encrypt.NewReader(l.disk, l.ctrCipher)
underlying = NewReaderWithCache(encrypt.NewReader(l.disk, l.ctrCipher), l.cipherWriter.GetCache(), l.cipherWriter.GetCacheDataOffset())
}
r := io.NewSectionReader(checksum.NewReader(underlying), off, l.offWrite-off)
checksumReader := NewReaderWithCache(checksum.NewReader(underlying), l.checksumWriter.GetCache(), l.checksumWriter.GetCacheDataOffset())
r := io.NewSectionReader(checksumReader, off, l.offWrite-off)
format := rowInDisk{numCol: len(l.fieldTypes)}
_, err = format.ReadFrom(r)
if err != nil {
Expand Down Expand Up @@ -367,3 +372,51 @@ func (format *diskFormatRow) toMutRow(fields []*types.FieldType) MutRow {
}
return MutRow{c: chk}
}

// ReaderWithCache helps to read data that has not be flushed to underlying layer.
// By using ReaderWithCache, user can still write data into ListInDisk even after reading.
type ReaderWithCache struct {
r io.ReaderAt
cacheOff int64
cache []byte
}

// NewReaderWithCache returns a ReaderWithCache.
func NewReaderWithCache(r io.ReaderAt, cache []byte, cacheOff int64) *ReaderWithCache {
return &ReaderWithCache{
r: r,
cacheOff: cacheOff,
cache: cache,
}
}

// ReadAt implements the ReadAt interface.
func (r *ReaderWithCache) ReadAt(p []byte, off int64) (readCnt int, err error) {
readCnt, err = r.r.ReadAt(p, off)
if err != io.EOF {
return readCnt, err
}

if len(p) == readCnt {
return readCnt, err
} else if len(p) < readCnt {
return readCnt, errors2.Trace(errors2.Errorf("cannot read more data than user requested"+
"(readCnt: %v, len(p): %v", readCnt, len(p)))
}

// When got here, user input is not filled fully, so we need read data from cache.
err = nil
p = p[readCnt:]
beg := off - r.cacheOff
if beg < 0 {
// This happens when only partial data of user requested resides in r.cache.
beg = 0
}
end := int(beg) + len(p)
if end > len(r.cache) {
err = io.EOF
end = len(r.cache)
}
readCnt += copy(p, r.cache[beg:end])
return readCnt, err
}
131 changes: 131 additions & 0 deletions util/chunk/disk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
package chunk

import (
"bytes"
"fmt"
"io"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"reflect"
"strconv"
"strings"
"testing"
Expand All @@ -30,6 +32,8 @@ import (
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/types/json"
"github.com/pingcap/tidb/util/checksum"
"github.com/pingcap/tidb/util/encrypt"
)

func initChunks(numChk, numRow int) ([]*Chunk, []*types.FieldType) {
Expand Down Expand Up @@ -219,6 +223,8 @@ func (s *testChunkSuite) TestListInDiskWithChecksum(c *check.C) {
})
testListInDisk(c)

testReaderWithCache(c)
testReaderWithCacheNoFlush(c)
}

func (s *testChunkSuite) TestListInDiskWithChecksumAndEncrypt(c *check.C) {
Expand All @@ -227,4 +233,129 @@ func (s *testChunkSuite) TestListInDiskWithChecksumAndEncrypt(c *check.C) {
conf.Security.SpilledFileEncryptionMethod = config.SpilledFileEncryptionMethodAES128CTR
})
testListInDisk(c)

testReaderWithCache(c)
testReaderWithCacheNoFlush(c)
}

// Following diagram describes the testdata we use to test:
// 4 B: checksum of this segment.
// 8 B: all columns' length, in the following example, we will only have one column.
// 1012 B: data in file. because max length of each segment is 1024, so we only have 1020B for user payload.
//
// Data in File Data in mem cache
// +------+------------------------------------------+ +-----------------------------+
// | | 1020B payload | | |
// |4Bytes| +---------+----------------------------+ | | |
// |checksum|8B collen| 1012B user data | | | 12B remained user data |
// | | +---------+----------------------------+ | | |
// | | | | |
// +------+------------------------------------------+ +-----------------------------+
func testReaderWithCache(c *check.C) {
testData := "0123456789"
buf := bytes.NewBuffer(nil)
for i := 0; i < 102; i++ {
buf.WriteString(testData)
}
buf.WriteString("0123")

field := []*types.FieldType{types.NewFieldType(mysql.TypeString)}
chk := NewChunkWithCapacity(field, 1)
chk.AppendString(0, buf.String())
l := NewListInDisk(field)
err := l.Add(chk)
c.Assert(err, check.IsNil)

// Basic test for GetRow().
row, err := l.GetRow(RowPtr{0, 0})
c.Assert(err, check.IsNil)
c.Assert(row.GetDatumRow(field), check.DeepEquals, chk.GetRow(0).GetDatumRow(field))

var underlying io.ReaderAt = l.disk
if l.ctrCipher != nil {
underlying = NewReaderWithCache(encrypt.NewReader(l.disk, l.ctrCipher), l.cipherWriter.GetCache(), l.cipherWriter.GetCacheDataOffset())
}
checksumReader := NewReaderWithCache(checksum.NewReader(underlying), l.checksumWriter.GetCache(), l.checksumWriter.GetCacheDataOffset())

// Read all data.
data := make([]byte, 1024)
// Offset is 8, because we want to ignore col length.
readCnt, err := checksumReader.ReadAt(data, 8)
c.Assert(err, check.IsNil)
c.Assert(readCnt, check.Equals, 1024)
c.Assert(reflect.DeepEqual(data, buf.Bytes()), check.IsTrue)

// Only read data of mem cache.
data = make([]byte, 1024)
readCnt, err = checksumReader.ReadAt(data, 1020)
c.Assert(err, check.Equals, io.EOF)
c.Assert(readCnt, check.Equals, 12)
c.Assert(reflect.DeepEqual(data[:12], buf.Bytes()[1012:]), check.IsTrue)

// Read partial data of mem cache.
data = make([]byte, 1024)
readCnt, err = checksumReader.ReadAt(data, 1025)
c.Assert(err, check.Equals, io.EOF)
c.Assert(readCnt, check.Equals, 7)
c.Assert(reflect.DeepEqual(data[:7], buf.Bytes()[1017:]), check.IsTrue)

// Read partial data from both file and mem cache.
data = make([]byte, 1024)
readCnt, err = checksumReader.ReadAt(data, 1010)
c.Assert(err, check.Equals, io.EOF)
c.Assert(readCnt, check.Equals, 22)
c.Assert(reflect.DeepEqual(data[:22], buf.Bytes()[1002:]), check.IsTrue)

// Offset is too large, so no data is read.
data = make([]byte, 1024)
readCnt, err = checksumReader.ReadAt(data, 1032)
c.Assert(err, check.Equals, io.EOF)
c.Assert(readCnt, check.Equals, 0)
c.Assert(reflect.DeepEqual(data, make([]byte, 1024)), check.IsTrue)

// Only read 1 byte from mem cache.
data = make([]byte, 1024)
readCnt, err = checksumReader.ReadAt(data, 1031)
c.Assert(err, check.Equals, io.EOF)
c.Assert(readCnt, check.Equals, 1)
c.Assert(reflect.DeepEqual(data[:1], buf.Bytes()[1023:]), check.IsTrue)

// Test user requested data is small.
// Only request 10 bytes.
data = make([]byte, 10)
readCnt, err = checksumReader.ReadAt(data, 1010)
c.Assert(err, check.IsNil)
c.Assert(readCnt, check.Equals, 10)
c.Assert(reflect.DeepEqual(data, buf.Bytes()[1002:1012]), check.IsTrue)
}

// Here we test situations where size of data is small, so no data is flushed to disk.
func testReaderWithCacheNoFlush(c *check.C) {
testData := "0123456789"

field := []*types.FieldType{types.NewFieldType(mysql.TypeString)}
chk := NewChunkWithCapacity(field, 1)
chk.AppendString(0, testData)
l := NewListInDisk(field)
err := l.Add(chk)
c.Assert(err, check.IsNil)

// Basic test for GetRow().
row, err := l.GetRow(RowPtr{0, 0})
c.Assert(err, check.IsNil)
c.Assert(row.GetDatumRow(field), check.DeepEquals, chk.GetRow(0).GetDatumRow(field))

var underlying io.ReaderAt = l.disk
if l.ctrCipher != nil {
underlying = NewReaderWithCache(encrypt.NewReader(l.disk, l.ctrCipher), l.cipherWriter.GetCache(), l.cipherWriter.GetCacheDataOffset())
}
checksumReader := NewReaderWithCache(checksum.NewReader(underlying), l.checksumWriter.GetCache(), l.checksumWriter.GetCacheDataOffset())

// Read all data.
data := make([]byte, 1024)
// Offset is 8, because we want to ignore col length.
readCnt, err := checksumReader.ReadAt(data, 8)
c.Assert(err, check.Equals, io.EOF)
c.Assert(readCnt, check.Equals, len(testData))
c.Assert(reflect.DeepEqual(data[:10], []byte(testData)), check.IsTrue)
}
Loading

0 comments on commit 424a5a8

Please sign in to comment.