Skip to content

Commit

Permalink
[SYS-6179] Add MultiReadAsync file API.
Browse files Browse the repository at this point in the history
Extending the file API to provide an option to implement asynchronous
multi read operation. The default implementation delegates to AsyncRead.
Updated the AsyncFileReader class implementation to use MultiReadAsync.
  • Loading branch information
dpetrov4 committed Oct 13, 2023
1 parent 83e0108 commit 86c1e00
Show file tree
Hide file tree
Showing 5 changed files with 237 additions and 24 deletions.
6 changes: 3 additions & 3 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2035,8 +2035,8 @@ Status Version::OverlapWithLevelIterator(const ReadOptions& read_options,
cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
mutable_cf_options_.prefix_extractor, should_sample_file_read(),
cfd_->internal_stats()->GetFileReadHist(level),
TableReaderCaller::kUserIterator, IsFilterSkipped(level, read_options), level,
&range_del_agg));
TableReaderCaller::kUserIterator, IsFilterSkipped(level, read_options),
level, &range_del_agg));
status = OverlapWithIterator(ucmp, smallest_user_key, largest_user_key,
iter.get(), overlap);
}
Expand Down Expand Up @@ -5648,7 +5648,7 @@ Status VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd,
}

if (edit->HasReplicationSequence()) {
replication_sequence_ = edit->GetReplicationSequence();
replication_sequence_ = edit->GetReplicationSequence();
}

// The builder can be nullptr only if edit is WAL manipulation,
Expand Down
103 changes: 103 additions & 0 deletions file/random_access_file_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
#include "file/random_access_file_reader.h"

#include <algorithm>
#include <cstddef>
#include <mutex>
#include <utility>

#include "file/file_util.h"
#include "monitoring/histogram.h"
Expand Down Expand Up @@ -599,4 +601,105 @@ void RandomAccessFileReader::ReadAsyncCallback(const FSReadRequest& req,
RecordIOStats(stats_, file_temperature_, is_last_level_, req.result.size());
delete read_async_info;
}

// Callback data for non-direct IO version of MultiReadAsync.
struct MultiReadAsyncCbInfo {
MultiReadAsyncCbInfo(
std::function<void(const FSReadRequest*, size_t, void*)> cb, void* cb_arg,
uint64_t start_time)
: cb_(cb), cb_arg_(cb_arg), start_time_(start_time) {}

std::function<void(const FSReadRequest*, size_t, void*)> cb_;
void* cb_arg_;
uint64_t start_time_;
#ifndef ROCKSDB_LITE
FileOperationInfo::StartTimePoint fs_start_ts_;
#endif
};

IOStatus RandomAccessFileReader::MultiReadAsync(
FSReadRequest* reqs, size_t num_reqs, const IOOptions& opts,
std::function<void(const FSReadRequest*, size_t, void*)> cb, void* cb_arg,
void** io_handles, size_t* num_io_handles, IOHandleDeleter* del_fns,
AlignedBuf* /* aligned_buf */) {
IOStatus s;
uint64_t elapsed = 0;

if (use_direct_io()) {
// DirectIO support not implemented for MultiReadAsync
abort();
}

// Create a callback and populate info.
auto read_async_callback = std::bind(
&RandomAccessFileReader::MultiReadAsyncCallback, this,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);

auto cb_info = new MultiReadAsyncCbInfo(cb, cb_arg, clock_->NowMicros());
#ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) {
cb_info->fs_start_ts_ = FileOperationInfo::StartNow();
}
#endif

StopWatch sw(clock_, nullptr /*stats*/, 0 /*hist_type*/, &elapsed,
true /*overwrite*/, true /*delay_enabled*/);
s = file_->MultiReadAsync(reqs, num_reqs, opts, read_async_callback, cb_info,
io_handles, num_io_handles, del_fns, nullptr);

RecordTick(stats_, READ_ASYNC_MICROS, elapsed);

// Suppress false positive clang analyzer warnings.
// Memory is not released if file_->ReadAsync returns !s.ok(), because
// ReadAsyncCallback is never called in that case. If ReadAsyncCallback is
// called then ReadAsync should always return IOStatus::OK().
#ifndef __clang_analyzer__
if (!s.ok()) {
delete cb_info;
}
#endif // __clang_analyzer__

return s;
}

void RandomAccessFileReader::MultiReadAsyncCallback(const FSReadRequest* reqs,
size_t n_reqs,
void* cb_arg) {
auto cb_info = static_cast<MultiReadAsyncCbInfo*>(cb_arg);
assert(cb_info);
assert(cb_info->cb_);

cb_info->cb_(reqs, n_reqs, cb_info->cb_arg_);

// Update stats and notify listeners.
if (stats_ != nullptr && file_read_hist_ != nullptr) {
// elapsed doesn't take into account delay and overwrite as StopWatch does
// in Read.
uint64_t elapsed = clock_->NowMicros() - cb_info->start_time_;
file_read_hist_->Add(elapsed);
}

for (size_t idx = 0; idx < n_reqs; idx++) {
auto& req = reqs[idx];
if (req.status.ok()) {
RecordInHistogram(stats_, ASYNC_READ_BYTES, req.result.size());
} else if (!req.status.IsAborted()) {
RecordTick(stats_, ASYNC_READ_ERROR_COUNT, 1);
}
#ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) {
auto finish_ts = FileOperationInfo::FinishNow();
NotifyOnFileReadFinish(req.offset, req.result.size(),
cb_info->fs_start_ts_, finish_ts, req.status);
}
if (!req.status.ok()) {
NotifyOnIOError(req.status, FileOperationType::kRead, file_name(),
req.result.size(), req.offset);
}
#endif
RecordIOStats(stats_, file_temperature_, is_last_level_, req.result.size());
}
delete cb_info;
}

} // namespace ROCKSDB_NAMESPACE
9 changes: 9 additions & 0 deletions file/random_access_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#pragma once
#include <atomic>
#include <cstddef>
#include <sstream>
#include <string>

Expand Down Expand Up @@ -212,6 +213,14 @@ class RandomAccessFileReader {
void* cb_arg, void** io_handle, IOHandleDeleter* del_fn,
AlignedBuf* aligned_buf);

IOStatus MultiReadAsync(
FSReadRequest* reqs, size_t num_reqs, const IOOptions& opts,
std::function<void(const FSReadRequest*, size_t, void*)> cb, void* cb_arg,
void** io_handles, size_t* num_io_handles, IOHandleDeleter* del_fns,
AlignedBuf* aligned_buf);

void ReadAsyncCallback(const FSReadRequest& req, void* cb_arg);
// Callback for non-directIO MultiReadAsync.
void MultiReadAsyncCallback(const FSReadRequest*, size_t, void*);
};
} // namespace ROCKSDB_NAMESPACE
81 changes: 81 additions & 0 deletions include/rocksdb/file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include <chrono>
#include <cstdarg>
#include <cstddef>
#include <functional>
#include <limits>
#include <memory>
Expand Down Expand Up @@ -918,6 +919,78 @@ class FSRandomAccessFile {
return IOStatus::OK();
}

// EXPERIMENTAL
// This API reads the requested data in a set of FSReadRequest asynchronously.
// This is an asynchronous call, i.e it should return after submitting the
// request.
//
// When the read request is completed, callback function specified in cb
// should be called with arguments cb_arg and the result populated in
// FSReadRequest with result and status fileds updated by FileSystem.
// cb_arg should be used by the callback to track the original request
// submitted.
//
// This API should also populate io_handles which should be used by
// underlying FileSystem to store the context in order to distinguish the read
// requests at their side and provide the custom deletion functions in
// del_fns. RocksDB guarantees that the del_fn for io_handle will be called
// after receiving the callback. Furthermore, RocksDB guarantees that if it
// calls the Poll API for this io_handle, del_fn will be called after the Poll
// returns. RocksDB is responsible for managing the lifetime of io_handles.
//
// The caller preallocates io_handles and del_fns arrays to be be the same
// size as the number of requests (num_reqs). num_io_handles parameter is
// used to pass out the information about how many io_handles (and
// corresponding del_funs) were populated during the call. num_io_handles
// must be pre-initiailized to the maximum size of io_handles/del_funs arrays
// (num_reqs) on the function call.
//
// reqs contains the request offset and size passed as input parameter of read
// request and result and status fields are output parameter set by underlying
// FileSystem. The data should always be read into scratch field.
//
// Default implementation delegates to ReadAsync for each request.
virtual IOStatus MultiReadAsync(
FSReadRequest* reqs, size_t num_reqs, const IOOptions& opts,
std::function<void(const FSReadRequest*, size_t, void*)> cb, void* cb_arg,
void** io_handles, size_t* num_io_handles, IOHandleDeleter* del_fns,
IODebugContext* dbg) {
assert(*num_io_handles == num_reqs);
*num_io_handles = 0;
// Counter that we use keep track of how many individual async reads are
// still in progress.
std::atomic<size_t> in_flight = 0;
for (size_t idx = 0; idx < num_reqs; idx++) {
auto& req = reqs[idx];
auto local_cb = [&in_flight](const FSReadRequest&, void*) {
// We are done with this read, decrement the counter and signal.
in_flight--;
in_flight.notify_one();
};
auto status = ReadAsync(req, opts, local_cb, nullptr, &io_handles[idx],
&del_fns[idx], dbg);
if (status != IOStatus::OK()) {
// Delete in-progress IO-handles.
for (size_t k = 0; k < *num_io_handles; k++) {
if (io_handles[k] && del_fns[k]) {
del_fns[k](io_handles[k]);
io_handles[k] = nullptr;
}
}
*num_io_handles = 0;
return status;
}
(*num_io_handles)++;
in_flight++;
}
for (auto cv = in_flight.load(); cv != 0;
in_flight.wait(cv), cv = in_flight.load()) {
}
// the operation is completed, call the 'op_cb'
cb(reqs, num_reqs, cb_arg);
return IOStatus::OK();
}

// EXPERIMENTAL
// When available, returns the actual temperature for the file. This is
// useful in case some outside process moves a file from one tier to another,
Expand Down Expand Up @@ -1616,6 +1689,14 @@ class FSRandomAccessFileWrapper : public FSRandomAccessFile {
IODebugContext* dbg) override {
return target()->ReadAsync(req, opts, cb, cb_arg, io_handle, del_fn, dbg);
}
IOStatus MultiReadAsync(
FSReadRequest* reqs, size_t num_reqs, const IOOptions& opts,
std::function<void(const FSReadRequest*, size_t, void*)> cb, void* cb_arg,
void** io_handles, size_t* num_io_handles, IOHandleDeleter* del_fns,
IODebugContext* dbg) override {
return target()->MultiReadAsync(reqs, num_reqs, opts, cb, cb_arg,
io_handles, num_io_handles, del_fns, dbg);
}
Temperature GetTemperature() const override {
return target_->GetTemperature();
}
Expand Down
62 changes: 41 additions & 21 deletions util/async_file_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,56 +19,76 @@ bool AsyncFileReader::MultiReadAsyncImpl(ReadAwaiter* awaiter) {
num_reqs_ += awaiter->num_reqs_;
awaiter->io_handle_.resize(awaiter->num_reqs_);
awaiter->del_fn_.resize(awaiter->num_reqs_);
for (size_t i = 0; i < awaiter->num_reqs_; ++i) {
IOStatus s = awaiter->file_->ReadAsync(
awaiter->read_reqs_[i], awaiter->opts_,
[](const FSReadRequest& req, void* cb_arg) {
FSReadRequest* read_req = static_cast<FSReadRequest*>(cb_arg);
read_req->status = req.status;
read_req->result = req.result;
},
&awaiter->read_reqs_[i], &awaiter->io_handle_[i], &awaiter->del_fn_[i],
/*aligned_buf=*/nullptr);
if (!s.ok()) {
// For any non-ok status, the FileSystem will not call the callback
// So let's update the status ourselves
size_t num_io_handles = awaiter->num_reqs_;
IOStatus s = awaiter->file_->MultiReadAsync(
awaiter->read_reqs_, awaiter->num_reqs_, awaiter->opts_,
[](const FSReadRequest* reqs, size_t n_reqs, void* cb_arg) {
FSReadRequest* read_reqs = static_cast<FSReadRequest*>(cb_arg);
if (read_reqs != reqs) {
for (size_t idx = 0; idx < n_reqs; idx++) {
read_reqs[idx].status = reqs[idx].status;
read_reqs[idx].result = reqs[idx].result;
}
}
},
(void**)&awaiter->read_reqs_, (void**)&awaiter->io_handle_,
&num_io_handles, &awaiter->del_fn_[0],
/*aligned_buf=*/nullptr);
if (!s.ok()) {
assert(num_io_handles == 0);
// For any non-ok status, the FileSystem will not call the callback
// So let's update the status ourselves assuming the whole batch failed.
for (size_t i = 0; i < awaiter->num_reqs_; ++i) {
awaiter->read_reqs_[i].status = s;
}
}
assert(num_io_handles <= awaiter->num_reqs_);
awaiter->io_handle_.resize(num_io_handles);
awaiter->del_fn_.resize(num_io_handles);
return true;
}

void AsyncFileReader::Wait() {
if (!head_) {
return;
}
ReadAwaiter* waiter;

// TODO: No need to copy if we have 1 awaiter.
// Poll API seems to encourage inefficiency.
std::vector<void*> io_handles;
IOStatus s;
io_handles.reserve(num_reqs_);
waiter = head_;

ReadAwaiter* waiter;
do {
for (size_t i = 0; i < waiter->num_reqs_; ++i) {
waiter = head_;
for (size_t i = 0; i < waiter->io_handle_.size(); ++i) {
if (waiter->io_handle_[i]) {
io_handles.push_back(waiter->io_handle_[i]);
}
}
} while (waiter != tail_ && (waiter = waiter->next_));

IOStatus s = IOStatus::OK();
if (io_handles.size() > 0) {
StopWatch sw(SystemClock::Default().get(), stats_, POLL_WAIT_MICROS);
s = fs_->Poll(io_handles, io_handles.size());
}

do {
waiter = head_;
head_ = waiter->next_;

for (size_t i = 0; i < waiter->num_reqs_; ++i) {
for (size_t i = 0; i < waiter->io_handle_.size(); ++i) {
if (waiter->io_handle_[i] && waiter->del_fn_[i]) {
waiter->del_fn_[i](waiter->io_handle_[i]);
}
if (waiter->read_reqs_[i].status.ok() && !s.ok()) {
// Override the request status with the Poll error
waiter->read_reqs_[i].status = s;
}
if (!s.ok()) {
for (size_t i = 0; i < waiter->num_reqs_; ++i) {
if (waiter->read_reqs_[i].status.ok()) {
// Override the request status with the Poll error
waiter->read_reqs_[i].status = s;
}
}
}
waiter->awaiting_coro_.resume();
Expand Down

0 comments on commit 86c1e00

Please sign in to comment.