Skip to content

Commit

Permalink
add isReaderStale
Browse files Browse the repository at this point in the history
Signed-off-by: guo-shaoge <shaoge1994@163.com>
  • Loading branch information
guo-shaoge committed May 17, 2021
1 parent 405eded commit c45b41d
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 5 deletions.
19 changes: 14 additions & 5 deletions util/chunk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ type ListInDisk struct {
checksumWriter *checksum.Writer
cipherWriter *encrypt.Writer

// By using this flag, we don't need to allocate a new reader every time.
isReaderStale bool
r io.ReaderAt

// ctrCipher stores the key and nonce using by aes encrypt io layer
ctrCipher *encrypt.CtrCipher
}
Expand All @@ -60,7 +64,8 @@ func NewListInDisk(fieldTypes []*types.FieldType) *ListInDisk {
l := &ListInDisk{
fieldTypes: fieldTypes,
// TODO(fengliyuan): set the quota of disk usage.
diskTracker: disk.NewTracker(memory.LabelForChunkListInDisk, -1),
diskTracker: disk.NewTracker(memory.LabelForChunkListInDisk, -1),
isReaderStale: true,
}
return l
}
Expand Down Expand Up @@ -150,6 +155,7 @@ func (l *ListInDisk) Add(chk *Chunk) (err error) {
l.offsets = append(l.offsets, chk2.getOffsetsOfRows())
l.diskTracker.Consume(n)
l.numRowsInDisk += chk.NumRows()
l.isReaderStale = true
return
}

Expand All @@ -174,11 +180,14 @@ func (l *ListInDisk) GetRow(ptr RowPtr) (row Row, err error) {
}
off := l.offsets[ptr.ChkIdx][ptr.RowIdx]
var underlying io.ReaderAt = l.disk
if l.ctrCipher != nil {
underlying = NewReaderWithCache(encrypt.NewReader(l.disk, l.ctrCipher), l.cipherWriter.GetCache(), l.cipherWriter.GetCacheDataOffset())
if l.isReaderStale {
if l.ctrCipher != nil {
underlying = NewReaderWithCache(encrypt.NewReader(l.disk, l.ctrCipher), l.cipherWriter.GetCache(), l.cipherWriter.GetCacheDataOffset())
}
l.r = NewReaderWithCache(checksum.NewReader(underlying), l.checksumWriter.GetCache(), l.checksumWriter.GetCacheDataOffset())
l.isReaderStale = false
}
checksumReader := NewReaderWithCache(checksum.NewReader(underlying), l.checksumWriter.GetCache(), l.checksumWriter.GetCacheDataOffset())
r := io.NewSectionReader(checksumReader, off, l.offWrite-off)
r := io.NewSectionReader(l.r, off, l.offWrite-off)
format := rowInDisk{numCol: len(l.fieldTypes)}
_, err = format.ReadFrom(r)
if err != nil {
Expand Down
27 changes: 27 additions & 0 deletions util/chunk/disk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ func testReaderWithCache(c *check.C) {
chk := NewChunkWithCapacity(field, 1)
chk.AppendString(0, buf.String())
l := NewListInDisk(field)
c.Assert(l.isReaderStale, check.IsTrue)
err := l.Add(chk)
c.Assert(err, check.IsNil)

Expand Down Expand Up @@ -327,6 +328,32 @@ func testReaderWithCache(c *check.C) {
c.Assert(err, check.IsNil)
c.Assert(readCnt, check.Equals, 10)
c.Assert(reflect.DeepEqual(data, buf.Bytes()[1002:1012]), check.IsTrue)

// Test l.isReaderStale works properly
// It means only new reader is alloced after writing.
oriReader := l.r
for i := 0; i < 100; i++ {
row, _ = l.GetRow(RowPtr{0, 0})
c.Assert(oriReader == l.r, check.IsTrue)
c.Assert(l.isReaderStale, check.IsFalse)
}
// After write, reader is stale.
err = l.Add(chk)
c.Assert(err, check.IsNil)
c.Assert(oriReader == l.r, check.IsTrue)
c.Assert(l.isReaderStale, check.IsTrue)

// New reader is generated when reading.
row, err = l.GetRow(RowPtr{1, 0})
c.Assert(oriReader != l.r, check.IsTrue)
c.Assert(l.isReaderStale, check.IsFalse)
oriReader = l.r

for i := 0; i < 100; i++ {
row, _ = l.GetRow(RowPtr{0, 0})
c.Assert(oriReader == l.r, check.IsTrue)
c.Assert(l.isReaderStale, check.IsFalse)
}
}

// Here we test situations where size of data is small, so no data is flushed to disk.
Expand Down

0 comments on commit c45b41d

Please sign in to comment.