Skip to content

Commit

Permalink
Add AsyncIO support for tuning readahead_size by block cache lookup (#…
Browse files Browse the repository at this point in the history
…11936)

Summary:
Add support for tuning of readahead_size by block cache lookup for async_io.

**Design/ Implementation** -

**BlockBasedTableIterator.cc** -

`BlockCacheLookupForReadAheadSize` callback API lookups in the block cache and tries to reduce the start
and end offset passed. This function looks into the block cache for the blocks between `start_offset`
and `end_offset` and add all the handles in the queue.

It then iterates from the end in the handles to find first miss block and update the end offset to that block.
It also iterates from the start and find first miss block and update the start offset to that block.

```
_read_curr_block_ argument : True if this call was due to miss in the cache and caller wants to read that block
                             synchronously.
                             False if current call is to prefetch additional data in extra buffers
                            (due to ReadAsync call in FilePrefetchBuffer)
```
In case there is no data to be read in that callback (because of upper_bound or all blocks are in cache),
it updates start and end offset to be equal and that `FilePrefetchBuffer` interprets that as 0 length to be read.

**FilePrefetchBuffer.cc** -

FilePrefetchBuffer calls the callback - `ReadAheadSizeTuning` and pass the start and end offset to that
callback to get updated start and end offset to read based on cache hits/misses.

1. In case of Read calls (when offset passed to FilePrefetchBuffer is on cache miss and that data needs to be read), _read_curr_block_ is passed true.
2. In case of ReadAsync calls, when buffer is all consumed and can go for additional prefetching,  the start offset passed is the initial end offset of prev buffer (without any updated offset based on cache hit/miss).

Foreg. if following are the data blocks with cache hit/miss and start offset
and Read API found miss on DB1 and based on readahead_size (50)  it passes end offset to be 50.
 [DB1 - miss- 0 ] [DB2 - hit -10] [DB3 - miss -20] [DB4 - miss-30] [DB5 - hit-40]
 [DB6 - hit-50] [DB7 - miss-60] [DB8 - miss - 70] [DB9 - hit - 80] [DB6 - hit 90]

- For Read call - updated start offset remains 0 but end offset updates to DB4, as DB5 is in cache.
- Read calls saves initial end offset 50 as that was meant to be prefetched.
- Now for next ReadAsync call - the start offset will be 50 (previous buffer initial end offset) and based on readahead_size, end offset will be 100
- On callback, because of cache hits - callback will update the start offset to 60 and end offset to 80 to read only 2 data blocks (DB7 and DB8).
- And for that ReadAsync call - initial end offset will be set to 100 which will again used by next ReadAsync call as start offset.
-  `initial_end_offset_` in `BufferInfo` is used to save the initial end offset of that buffer.

- If let's say DB5 and DB6 overlaps in 2 buffers (because of alignment), `prev_buf_end_offset` is passed to make sure already prefetched data is not prefetched again in second buffer.

Pull Request resolved: #11936

Test Plan:
- Ran crash_test several times.
-  New unit tests added.

Reviewed By: anand1976

Differential Revision: D50906217

Pulled By: akankshamahajan15

fbshipit-source-id: 0d75d3c98274e98aa34901b201b8fb05232139cf
  • Loading branch information
akankshamahajan15 authored and facebook-github-bot committed Dec 6, 2023
1 parent 0ebe161 commit c77b50a
Show file tree
Hide file tree
Showing 12 changed files with 977 additions and 277 deletions.
392 changes: 229 additions & 163 deletions file/file_prefetch_buffer.cc

Large diffs are not rendered by default.

63 changes: 42 additions & 21 deletions file/file_prefetch_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ struct IOOptions;
class RandomAccessFileReader;

struct BufferInfo {
void ClearBuffer() {
buffer_.Clear();
initial_end_offset_ = 0;
}

AlignedBuffer buffer_;

uint64_t offset_ = 0;
Expand All @@ -52,6 +57,18 @@ struct BufferInfo {

// pos represents the index of this buffer in vector of BufferInfo.
uint32_t pos_ = 0;

// initial_end_offset is used to keep track of the end offset of the buffer
// that was originally called. It's helpful in case of autotuning of readahead
// size when callback is made to BlockBasedTableIterator.
// initial end offset of this buffer which will be the starting
// offset of next prefetch.
//
// For example - if end offset of previous buffer was 100 and because of
// readahead_size optimization, end_offset was trimmed to 60. Then for next
// prefetch call, start_offset should be intialized to 100 i.e start_offset =
// buf->initial_end_offset_.
uint64_t initial_end_offset_ = 0;
};

enum class FilePrefetchBufferUsage {
Expand Down Expand Up @@ -91,7 +108,7 @@ class FilePrefetchBuffer {
uint64_t num_file_reads_for_auto_readahead = 0,
uint64_t upper_bound_offset = 0, FileSystem* fs = nullptr,
SystemClock* clock = nullptr, Statistics* stats = nullptr,
const std::function<void(uint64_t, size_t, size_t&)>& cb = nullptr,
const std::function<void(bool, uint64_t&, uint64_t&)>& cb = nullptr,
FilePrefetchBufferUsage usage = FilePrefetchBufferUsage::kUnknown)
: curr_(0),
readahead_size_(readahead_size),
Expand Down Expand Up @@ -239,9 +256,6 @@ class FilePrefetchBuffer {
void UpdateReadPattern(const uint64_t& offset, const size_t& len,
bool decrease_readaheadsize) {
if (decrease_readaheadsize) {
// Since this block was eligible for prefetch but it was found in
// cache, so check and decrease the readahead_size by 8KB (default)
// if eligible.
DecreaseReadAheadIfEligible(offset, len);
}
prev_offset_ = offset;
Expand Down Expand Up @@ -287,6 +301,12 @@ class FilePrefetchBuffer {
readahead_size_ = initial_auto_readahead_size_;
}

void TEST_GetBufferOffsetandSize(uint32_t index, uint64_t& offset,
size_t& len) {
offset = bufs_[index].offset_;
len = bufs_[index].buffer_.CurrentSize();
}

private:
// Calculates roundoff offset and length to be prefetched based on alignment
// and data present in buffer_. It also allocates new buffer or refit tail if
Expand All @@ -299,24 +319,24 @@ class FilePrefetchBuffer {

void AbortAllIOs();

void UpdateBuffersIfNeeded(uint64_t offset);
void UpdateBuffersIfNeeded(uint64_t offset, size_t len);

// It calls Poll API if any there is any pending asynchronous request. It then
// checks if data is in any buffer. It clears the outdated data and swaps the
// buffers if required.
void PollAndUpdateBuffersIfNeeded(uint64_t offset);
void PollAndUpdateBuffersIfNeeded(uint64_t offset, size_t len);

Status PrefetchAsyncInternal(const IOOptions& opts,
RandomAccessFileReader* reader, uint64_t offset,
size_t length, size_t readahead_size,
bool& copy_to_third_buffer);

Status Read(const IOOptions& opts, RandomAccessFileReader* reader,
uint64_t read_len, uint64_t chunk_len, uint64_t rounddown_start,
uint64_t read_len, uint64_t chunk_len, uint64_t start_offset,
uint32_t index);

Status ReadAsync(const IOOptions& opts, RandomAccessFileReader* reader,
uint64_t read_len, uint64_t rounddown_start, uint32_t index);
uint64_t read_len, uint64_t start_offset, uint32_t index);

// Copy the data from src to third buffer.
void CopyDataToBuffer(uint32_t src, uint64_t& offset, size_t& length);
Expand Down Expand Up @@ -402,7 +422,7 @@ class FilePrefetchBuffer {
return false;
}

bufs_[second].buffer_.Clear();
bufs_[second].ClearBuffer();
return true;
}

Expand Down Expand Up @@ -451,19 +471,20 @@ class FilePrefetchBuffer {
return false;
}

// Performs tuning to calculate readahead_size.
size_t ReadAheadSizeTuning(uint64_t offset, size_t n) {
UpdateReadAheadSizeForUpperBound(offset, n);
void ReadAheadSizeTuning(bool read_curr_block, bool refit_tail,
uint64_t prev_buf_end_offset, uint32_t index,
size_t alignment, size_t length,
size_t readahead_size, uint64_t& offset,
uint64_t& end_offset, size_t& read_len,
uint64_t& chunk_len);

if (readaheadsize_cb_ != nullptr && readahead_size_ > 0) {
size_t updated_readahead_size = 0;
readaheadsize_cb_(offset, readahead_size_, updated_readahead_size);
if (readahead_size_ != updated_readahead_size) {
RecordTick(stats_, READAHEAD_TRIMMED);
}
return updated_readahead_size;
void UpdateStats(bool found_in_buffer, size_t length_found) {
if (found_in_buffer) {
RecordTick(stats_, PREFETCH_HITS);
}
if (length_found > 0) {
RecordTick(stats_, PREFETCH_BYTES_USEFUL, length_found);
}
return readahead_size_;
}

std::vector<BufferInfo> bufs_;
Expand Down Expand Up @@ -512,6 +533,6 @@ class FilePrefetchBuffer {
// ReadOptions.auto_readahead_size are set to trim readahead_size upto
// upper_bound_offset_ during prefetching.
uint64_t upper_bound_offset_ = 0;
std::function<void(uint64_t, size_t, size_t&)> readaheadsize_cb_;
std::function<void(bool, uint64_t&, uint64_t&)> readaheadsize_cb_;
};
} // namespace ROCKSDB_NAMESPACE
4 changes: 4 additions & 0 deletions file/prefetch_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1325,6 +1325,10 @@ TEST_P(PrefetchTest, PrefetchWithBlockLookupAutoTuneTest) {
ropts.readahead_size = cmp_ro.readahead_size = 32768;
}

if (std::get<1>(GetParam())) {
ropts.async_io = true;
}

// With and without tuning readahead_size.
{
ASSERT_OK(options.statistics->Reset());
Expand Down
5 changes: 3 additions & 2 deletions file/random_access_file_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -492,8 +492,9 @@ IOStatus RandomAccessFileReader::ReadAsync(
auto read_async_callback =
std::bind(&RandomAccessFileReader::ReadAsyncCallback, this,
std::placeholders::_1, std::placeholders::_2);
ReadAsyncInfo* read_async_info =
new ReadAsyncInfo(cb, cb_arg, clock_->NowMicros());

ReadAsyncInfo* read_async_info = new ReadAsyncInfo(
cb, cb_arg, (clock_ != nullptr ? clock_->NowMicros() : 0));

if (ShouldNotifyListeners()) {
read_async_info->fs_start_ts_ = FileOperationInfo::StartNow();
Expand Down
Loading

0 comments on commit c77b50a

Please sign in to comment.