Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Group rocksdb.sst.read.micros stat by IOActivity flush and compaction #11288

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
### New Features
* Add experimental `PerfContext` counters `iter_{next|prev|seek}_count` for db iterator, each counting the times of corresponding API being called.
* Allow runtime changes to whether `WriteBufferManager` allows stall or not by calling `SetAllowStall()`
* New statistics `rocksdb.file.read.{flush|compaction}.micros` that measure read time of block-based SST tables or blob files during flush or compaction.

### Bug Fixes
* In block cache tracing, fixed some cases of bad hit/miss information (and more) with MultiGet.
Expand Down
4 changes: 2 additions & 2 deletions db/blob/blob_file_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ BlobFileCache::BlobFileCache(Cache* cache,
}

Status BlobFileCache::GetBlobFileReader(
uint64_t blob_file_number,
const ReadOptions& read_options, uint64_t blob_file_number,
CacheHandleGuard<BlobFileReader>* blob_file_reader) {
assert(blob_file_reader);
assert(blob_file_reader->IsEmpty());
Expand Down Expand Up @@ -73,7 +73,7 @@ Status BlobFileCache::GetBlobFileReader(
{
assert(file_options_);
const Status s = BlobFileReader::Create(
*immutable_options_, *file_options_, column_family_id_,
*immutable_options_, read_options, *file_options_, column_family_id_,
blob_file_read_hist_, blob_file_number, io_tracer_, &reader);
if (!s.ok()) {
RecordTick(statistics, NO_FILE_ERRORS);
Expand Down
3 changes: 2 additions & 1 deletion db/blob/blob_file_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ class BlobFileCache {
BlobFileCache(const BlobFileCache&) = delete;
BlobFileCache& operator=(const BlobFileCache&) = delete;

Status GetBlobFileReader(uint64_t blob_file_number,
Status GetBlobFileReader(const ReadOptions& read_options,
uint64_t blob_file_number,
CacheHandleGuard<BlobFileReader>* blob_file_reader);

private:
Expand Down
25 changes: 17 additions & 8 deletions db/blob/blob_file_cache_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,15 +118,18 @@ TEST_F(BlobFileCacheTest, GetBlobFileReader) {
// First try: reader should be opened and put in cache
CacheHandleGuard<BlobFileReader> first;

ASSERT_OK(blob_file_cache.GetBlobFileReader(blob_file_number, &first));
const ReadOptions read_options;
ASSERT_OK(blob_file_cache.GetBlobFileReader(read_options, blob_file_number,
&first));
ASSERT_NE(first.GetValue(), nullptr);
ASSERT_EQ(options.statistics->getTickerCount(NO_FILE_OPENS), 1);
ASSERT_EQ(options.statistics->getTickerCount(NO_FILE_ERRORS), 0);

// Second try: reader should be served from cache
CacheHandleGuard<BlobFileReader> second;

ASSERT_OK(blob_file_cache.GetBlobFileReader(blob_file_number, &second));
ASSERT_OK(blob_file_cache.GetBlobFileReader(read_options, blob_file_number,
&second));
ASSERT_NE(second.GetValue(), nullptr);
ASSERT_EQ(options.statistics->getTickerCount(NO_FILE_OPENS), 1);
ASSERT_EQ(options.statistics->getTickerCount(NO_FILE_ERRORS), 0);
Expand Down Expand Up @@ -163,19 +166,21 @@ TEST_F(BlobFileCacheTest, GetBlobFileReader_Race) {
CacheHandleGuard<BlobFileReader> first;
CacheHandleGuard<BlobFileReader> second;

const ReadOptions read_options;
SyncPoint::GetInstance()->SetCallBack(
"BlobFileCache::GetBlobFileReader:DoubleCheck", [&](void* /* arg */) {
// Disabling sync points to prevent infinite recursion
SyncPoint::GetInstance()->DisableProcessing();

ASSERT_OK(blob_file_cache.GetBlobFileReader(blob_file_number, &second));
ASSERT_OK(blob_file_cache.GetBlobFileReader(read_options,
blob_file_number, &second));
ASSERT_NE(second.GetValue(), nullptr);
ASSERT_EQ(options.statistics->getTickerCount(NO_FILE_OPENS), 1);
ASSERT_EQ(options.statistics->getTickerCount(NO_FILE_ERRORS), 0);
});
SyncPoint::GetInstance()->EnableProcessing();

ASSERT_OK(blob_file_cache.GetBlobFileReader(blob_file_number, &first));
ASSERT_OK(blob_file_cache.GetBlobFileReader(read_options, blob_file_number,
&first));
ASSERT_NE(first.GetValue(), nullptr);
ASSERT_EQ(options.statistics->getTickerCount(NO_FILE_OPENS), 1);
ASSERT_EQ(options.statistics->getTickerCount(NO_FILE_ERRORS), 0);
Expand Down Expand Up @@ -213,8 +218,10 @@ TEST_F(BlobFileCacheTest, GetBlobFileReader_IOError) {

CacheHandleGuard<BlobFileReader> reader;

const ReadOptions read_options;
ASSERT_TRUE(
blob_file_cache.GetBlobFileReader(blob_file_number, &reader).IsIOError());
blob_file_cache.GetBlobFileReader(read_options, blob_file_number, &reader)
.IsIOError());
ASSERT_EQ(reader.GetValue(), nullptr);
ASSERT_EQ(options.statistics->getTickerCount(NO_FILE_OPENS), 1);
ASSERT_EQ(options.statistics->getTickerCount(NO_FILE_ERRORS), 1);
Expand Down Expand Up @@ -253,8 +260,10 @@ TEST_F(BlobFileCacheTest, GetBlobFileReader_CacheFull) {
// strict_capacity_limit is set
CacheHandleGuard<BlobFileReader> reader;

ASSERT_TRUE(blob_file_cache.GetBlobFileReader(blob_file_number, &reader)
.IsMemoryLimit());
const ReadOptions read_options;
ASSERT_TRUE(
blob_file_cache.GetBlobFileReader(read_options, blob_file_number, &reader)
.IsMemoryLimit());
ASSERT_EQ(reader.GetValue(), nullptr);
ASSERT_EQ(options.statistics->getTickerCount(NO_FILE_OPENS), 1);
ASSERT_EQ(options.statistics->getTickerCount(NO_FILE_ERRORS), 1);
Expand Down
59 changes: 39 additions & 20 deletions db/blob/blob_file_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@
namespace ROCKSDB_NAMESPACE {

Status BlobFileReader::Create(
const ImmutableOptions& immutable_options, const FileOptions& file_options,
uint32_t column_family_id, HistogramImpl* blob_file_read_hist,
uint64_t blob_file_number, const std::shared_ptr<IOTracer>& io_tracer,
const ImmutableOptions& immutable_options, const ReadOptions& read_options,
const FileOptions& file_options, uint32_t column_family_id,
HistogramImpl* blob_file_read_hist, uint64_t blob_file_number,
const std::shared_ptr<IOTracer>& io_tracer,
std::unique_ptr<BlobFileReader>* blob_file_reader) {
assert(blob_file_reader);
assert(!*blob_file_reader);
Expand All @@ -52,15 +53,17 @@ Status BlobFileReader::Create(
CompressionType compression_type = kNoCompression;

{
const Status s = ReadHeader(file_reader.get(), column_family_id, statistics,
&compression_type);
const Status s =
ReadHeader(file_reader.get(), read_options, column_family_id,
statistics, &compression_type);
if (!s.ok()) {
return s;
}
}

{
const Status s = ReadFooter(file_reader.get(), file_size, statistics);
const Status s =
ReadFooter(file_reader.get(), read_options, file_size, statistics);
if (!s.ok()) {
return s;
}
Expand Down Expand Up @@ -134,6 +137,7 @@ Status BlobFileReader::OpenFile(
}

Status BlobFileReader::ReadHeader(const RandomAccessFileReader* file_reader,
const ReadOptions& read_options,
uint32_t column_family_id,
Statistics* statistics,
CompressionType* compression_type) {
Expand All @@ -151,9 +155,10 @@ Status BlobFileReader::ReadHeader(const RandomAccessFileReader* file_reader,
constexpr size_t read_size = BlobLogHeader::kSize;

// TODO: rate limit reading headers from blob files.
const Status s = ReadFromFile(file_reader, read_offset, read_size,
statistics, &header_slice, &buf, &aligned_buf,
Env::IO_TOTAL /* rate_limiter_priority */);
const Status s =
ReadFromFile(file_reader, read_options, read_offset, read_size,
statistics, &header_slice, &buf, &aligned_buf,
Env::IO_TOTAL /* rate_limiter_priority */);
if (!s.ok()) {
return s;
}
Expand Down Expand Up @@ -187,6 +192,7 @@ Status BlobFileReader::ReadHeader(const RandomAccessFileReader* file_reader,
}

Status BlobFileReader::ReadFooter(const RandomAccessFileReader* file_reader,
const ReadOptions& read_options,
uint64_t file_size, Statistics* statistics) {
assert(file_size >= BlobLogHeader::kSize + BlobLogFooter::kSize);
assert(file_reader);
Expand All @@ -202,9 +208,10 @@ Status BlobFileReader::ReadFooter(const RandomAccessFileReader* file_reader,
constexpr size_t read_size = BlobLogFooter::kSize;

// TODO: rate limit reading footers from blob files.
const Status s = ReadFromFile(file_reader, read_offset, read_size,
statistics, &footer_slice, &buf, &aligned_buf,
Env::IO_TOTAL /* rate_limiter_priority */);
const Status s =
ReadFromFile(file_reader, read_options, read_offset, read_size,
statistics, &footer_slice, &buf, &aligned_buf,
Env::IO_TOTAL /* rate_limiter_priority */);
if (!s.ok()) {
return s;
}
Expand Down Expand Up @@ -232,6 +239,7 @@ Status BlobFileReader::ReadFooter(const RandomAccessFileReader* file_reader,
}

Status BlobFileReader::ReadFromFile(const RandomAccessFileReader* file_reader,
const ReadOptions& read_options,
uint64_t read_offset, size_t read_size,
Statistics* statistics, Slice* slice,
Buffer* buf, AlignedBuf* aligned_buf,
Expand All @@ -246,17 +254,23 @@ Status BlobFileReader::ReadFromFile(const RandomAccessFileReader* file_reader,

Status s;

IOOptions io_options;
s = file_reader->PrepareIOOptions(read_options, io_options);
if (!s.ok()) {
return s;
}

if (file_reader->use_direct_io()) {
constexpr char* scratch = nullptr;

s = file_reader->Read(IOOptions(), read_offset, read_size, slice, scratch,
s = file_reader->Read(io_options, read_offset, read_size, slice, scratch,
aligned_buf, rate_limiter_priority);
} else {
buf->reset(new char[read_size]);
constexpr AlignedBuf* aligned_scratch = nullptr;

s = file_reader->Read(IOOptions(), read_offset, read_size, slice,
buf->get(), aligned_scratch, rate_limiter_priority);
s = file_reader->Read(io_options, read_offset, read_size, slice, buf->get(),
aligned_scratch, rate_limiter_priority);
}

if (!s.ok()) {
Expand Down Expand Up @@ -324,8 +338,13 @@ Status BlobFileReader::GetBlob(
Status s;
constexpr bool for_compaction = true;

IOOptions io_options;
s = file_reader_->PrepareIOOptions(read_options, io_options);
if (!s.ok()) {
return s;
}
prefetched = prefetch_buffer->TryReadFromCache(
IOOptions(), file_reader_.get(), record_offset,
io_options, file_reader_.get(), record_offset,
static_cast<size_t>(record_size), &record_slice, &s,
read_options.rate_limiter_priority, for_compaction);
if (!s.ok()) {
Expand All @@ -338,10 +357,10 @@ Status BlobFileReader::GetBlob(
PERF_COUNTER_ADD(blob_read_count, 1);
PERF_COUNTER_ADD(blob_read_byte, record_size);
PERF_TIMER_GUARD(blob_read_time);
const Status s = ReadFromFile(file_reader_.get(), record_offset,
static_cast<size_t>(record_size), statistics_,
&record_slice, &buf, &aligned_buf,
read_options.rate_limiter_priority);
const Status s = ReadFromFile(
file_reader_.get(), read_options, record_offset,
static_cast<size_t>(record_size), statistics_, &record_slice, &buf,
&aligned_buf, read_options.rate_limiter_priority);
if (!s.ok()) {
return s;
}
Expand Down
6 changes: 5 additions & 1 deletion db/blob/blob_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class Statistics;
class BlobFileReader {
public:
static Status Create(const ImmutableOptions& immutable_options,
const ReadOptions& read_options,
const FileOptions& file_options,
uint32_t column_family_id,
HistogramImpl* blob_file_read_hist,
Expand Down Expand Up @@ -74,15 +75,18 @@ class BlobFileReader {
std::unique_ptr<RandomAccessFileReader>* file_reader);

static Status ReadHeader(const RandomAccessFileReader* file_reader,
const ReadOptions& read_options,
uint32_t column_family_id, Statistics* statistics,
CompressionType* compression_type);

static Status ReadFooter(const RandomAccessFileReader* file_reader,
uint64_t file_size, Statistics* statistics);
const ReadOptions& read_options, uint64_t file_size,
Statistics* statistics);

using Buffer = std::unique_ptr<char[]>;

static Status ReadFromFile(const RandomAccessFileReader* file_reader,
const ReadOptions& read_options,
uint64_t read_offset, size_t read_size,
Statistics* statistics, Slice* slice, Buffer* buf,
AlignedBuf* aligned_buf,
Expand Down
Loading