Skip to content

Commit

Permalink
vfs: Add prefetchFile, use it for reading ahead data blocks
Browse files Browse the repository at this point in the history
For sequential-like IO workload where we read
data blocks one after the other in quick succession,
signalling the OS to asynchronously bring them to
cache in advance can deliver significant savings in IOPS
dispatched. In IOPS-bound workloads such as backup on
an EBS disk, this delivers a 3x speedup. Presumably
aggregate queries and compactions will be faster
as well, though this hasn't been benchmarked in
practice yet.

This change maintains a counter for the number of data
block reads performed in a singleLevelIterator, and
when that count exceeds 2, a readahead system
call is made on Linux. RocksDB has almost
the exact same behaviour, including the same
min/max readahead sizes and read count thresholds.

Will address cockroachdb/cockroach#49710
when it lands in cockraoch.
  • Loading branch information
itsbilal committed Jun 3, 2020
1 parent 660b76a commit 01340fb
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 14 deletions.
65 changes: 52 additions & 13 deletions sstable/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ import (

var errCorruptIndexEntry = errors.New("pebble/table: corrupt index entry")

const (
// Constants for dynamic readahead of data blocks.
minFileReadsForReadahead = 2
initialReadaheadSize = 8 << 10 /* 8KB */
maxReadaheadSize = 256 << 10 /* 256KB */
)

// decodeBlockHandle returns the block handle encoded at the start of src, as
// well as the number of bytes it occupies. It returns zero if given invalid
// input.
Expand Down Expand Up @@ -72,6 +79,7 @@ type singleLevelIterator struct {
reader *Reader
index blockIter
data blockIter
dataAttrs blockAttrs
dataBH BlockHandle
err error
closeHook func(i Iterator) error
Expand Down Expand Up @@ -146,6 +154,7 @@ func (i *singleLevelIterator) init(r *Reader, lower, upper []byte) error {
_ = i.index.Close()
return err
}
i.dataAttrs.readaheadSize = initialReadaheadSize
return nil
}

Expand Down Expand Up @@ -196,7 +205,7 @@ func (i *singleLevelIterator) loadBlock() bool {
i.err = errCorruptIndexEntry
return false
}
block, err := i.reader.readBlock(i.dataBH, nil /* transform */)
block, err := i.reader.readBlock(i.dataBH, nil /* transform */, &i.dataAttrs)
if err != nil {
i.err = err
return false
Expand Down Expand Up @@ -591,7 +600,7 @@ func (i *twoLevelIterator) loadIndex() bool {
i.err = errors.New("pebble/table: corrupt top level index entry")
return false
}
indexBlock, err := i.reader.readBlock(h, nil /* transform */)
indexBlock, err := i.reader.readBlock(h, nil /* transform */, nil /* attrs */)
if err != nil {
i.err = err
return false
Expand Down Expand Up @@ -906,6 +915,15 @@ func (i *twoLevelCompactionIterator) skipForward(

type blockTransform func([]byte) ([]byte, error)

// blockAttrs contains state variables that is updated upon block access,
// and retained between accesses for that type of block (index, data, filter,
// etc).
type blockAttrs struct {
numReads uint64
readaheadSize uint64
readaheadLimit uint64
}

// ReaderOption provide an interface to do work on Reader while it is being
// opened.
type ReaderOption interface {
Expand Down Expand Up @@ -1154,23 +1172,44 @@ func (r *Reader) NewRangeDelIter() (base.InternalIterator, error) {
}

func (r *Reader) readIndex() (cache.Handle, error) {
return r.readBlock(r.indexBH, nil /* transform */)
return r.readBlock(r.indexBH, nil /* transform */, nil /* attrs */)
}

func (r *Reader) readFilter() (cache.Handle, error) {
return r.readBlock(r.filterBH, nil /* transform */)
return r.readBlock(r.filterBH, nil /* transform */, nil /* attrs */)
}

func (r *Reader) readRangeDel() (cache.Handle, error) {
return r.readBlock(r.rangeDelBH, r.rangeDelTransform)
return r.readBlock(r.rangeDelBH, r.rangeDelTransform, nil /* attrs */)
}

// readBlock reads and decompresses a block from disk into memory.
func (r *Reader) readBlock(bh BlockHandle, transform blockTransform) (cache.Handle, error) {
func (r *Reader) readBlock(bh BlockHandle, transform blockTransform, attrs *blockAttrs) (cache.Handle, error) {
if h := r.opts.Cache.Get(r.cacheID, r.fileNum, bh.Offset); h.Get() != nil {
return h, nil
}

if attrs != nil {
attrs.numReads++
// If this type of block has been read multiple times in this file so
// far, signal the OS to asynchronously read ahead multiple blocks.
if attrs.numReads > minFileReadsForReadahead {
if pf, ok := r.file.(vfs.Prefetch); ok && bh.Offset + bh.Length + blockTrailerLen > attrs.readaheadLimit {
// Prefetching causes test runs with the race detector to often
// run out of memory, so disable it if the race detector is
// running.
if !invariants.RaceEnabled {
_ = pf.Prefetch(bh.Offset, attrs.readaheadSize)
}
attrs.readaheadLimit = bh.Offset + attrs.readaheadSize
attrs.readaheadSize *= 2
if attrs.readaheadSize > maxReadaheadSize {
attrs.readaheadSize = maxReadaheadSize
}
}
}
}

v := r.opts.Cache.Alloc(int(bh.Length + blockTrailerLen))
b := v.Buf()
if _, err := r.file.ReadAt(b, int64(bh.Offset)); err != nil {
Expand Down Expand Up @@ -1280,7 +1319,7 @@ func (r *Reader) transformRangeDelV1(b []byte) ([]byte, error) {
}

func (r *Reader) readMetaindex(metaindexBH BlockHandle) error {
b, err := r.readBlock(metaindexBH, nil /* transform */)
b, err := r.readBlock(metaindexBH, nil /* transform */, nil /* attrs */)
if err != nil {
return err
}
Expand Down Expand Up @@ -1310,7 +1349,7 @@ func (r *Reader) readMetaindex(metaindexBH BlockHandle) error {
}

if bh, ok := meta[metaPropertiesName]; ok {
b, err = r.readBlock(bh, nil /* transform */)
b, err = r.readBlock(bh, nil /* transform */, nil /* attrs */)
if err != nil {
return err
}
Expand Down Expand Up @@ -1402,7 +1441,7 @@ func (r *Reader) Layout() (*Layout, error) {
}
l.Index = append(l.Index, indexBH)

subIndex, err := r.readBlock(indexBH, nil /* transform */)
subIndex, err := r.readBlock(indexBH, nil /* transform */, nil /* attrs */)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1468,7 +1507,7 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) {
if n == 0 || n != len(val) {
return 0, errCorruptIndexEntry
}
startIdxBlock, err := r.readBlock(startIdxBH, nil /* transform */)
startIdxBlock, err := r.readBlock(startIdxBH, nil /* transform */, nil /* attrs */)
if err != nil {
return 0, err
}
Expand All @@ -1488,7 +1527,7 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) {
if n == 0 || n != len(val) {
return 0, errCorruptIndexEntry
}
endIdxBlock, err := r.readBlock(endIdxBH, nil /* transform */)
endIdxBlock, err := r.readBlock(endIdxBH, nil /* transform */, nil /* attrs */)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -1536,7 +1575,7 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) {
func NewReader(f vfs.File, o ReaderOptions, extraOpts ...ReaderOption) (*Reader, error) {
o = o.ensureDefaults()
r := &Reader{
file: f,
file: vfs.NewPrefetchFile(f),
opts: o,
}
if r.opts.Cache == nil {
Expand Down Expand Up @@ -1677,7 +1716,7 @@ func (l *Layout) Describe(
continue
}

h, err := r.readBlock(b.BlockHandle, nil /* transform */)
h, err := r.readBlock(b.BlockHandle, nil /* transform */, nil /* attrs */)
if err != nil {
fmt.Fprintf(w, " [err: %s]\n", err)
continue
Expand Down
2 changes: 1 addition & 1 deletion sstable/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ func TestMetaIndexEntriesSorted(t *testing.T) {
r, err := NewReader(f, ReaderOptions{})
require.NoError(t, err)

b, err := r.readBlock(r.metaIndexBH, nil /* transform */)
b, err := r.readBlock(r.metaIndexBH, nil /* transform */, nil /* attrs */)
require.NoError(t, err)
defer b.Release()

Expand Down
35 changes: 35 additions & 0 deletions vfs/prefetch_file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
// of this source code is governed by a BSD-style license that can be found in
// the LICENSE file.

package vfs

type prefetchFile struct {
File
fd uintptr
}

// Prefetch is an interface for a file that supports prefetching/readahead.
type Prefetch interface {
Prefetch(offset uint64, size uint64) error
}

// NewPrefetchFile wraps a readable file and adds a Prefetch() method on
// supported platforms.
func NewPrefetchFile(f File) File {
if f == nil {
return nil
}

s := &prefetchFile{
File: f,
}

type fd interface {
Fd() uintptr
}
if d, ok := f.(fd); ok {
s.fd = d.Fd()
}
return s
}
15 changes: 15 additions & 0 deletions vfs/prefetch_file_generic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
// of this source code is governed by a BSD-style license that can be found in
// the LICENSE file.

// +build !linux

package vfs

// Prefetch, on supported platforms, signals the OS to fetch the next size
// bytes after offset into cache. Any subsequent reads in that range will
// not issue disk IO.
func (p *prefetchFile) Prefetch(offset uint64, size uint64) error {
// No-op.
return nil
}
17 changes: 17 additions & 0 deletions vfs/prefetch_file_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
// of this source code is governed by a BSD-style license that can be found in
// the LICENSE file.

// +build linux

package vfs

import "syscall"

// Prefetch, on supported platforms, signals the OS to fetch the next size
// bytes after offset into cache. Any subsequent reads in that range will
// not issue disk IO.
func (p *prefetchFile) Prefetch(offset uint64, size uint64) error {
_, _, err := syscall.Syscall(syscall.SYS_READAHEAD, uintptr(p.fd), uintptr(offset), uintptr(size))
return err
}

0 comments on commit 01340fb

Please sign in to comment.