Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor TableCache Get/NewIterator for single exit points #1534

Closed
wants to merge 4 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
228 changes: 115 additions & 113 deletions db/table_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -174,77 +174,78 @@ InternalIterator* TableCache::NewIterator(
bool skip_filters, int level) {
PERF_TIMER_GUARD(new_table_iterator_nanos);

Status s;
if (range_del_agg != nullptr && !options.ignore_range_deletions) {
std::unique_ptr<InternalIterator> range_del_iter(NewRangeDeletionIterator(
options, icomparator, fd, file_read_hist, skip_filters, level));
Status s = range_del_iter->status();
s = range_del_iter->status();
if (s.ok()) {
s = range_del_agg->AddTombstones(std::move(range_del_iter));
}
if (!s.ok()) {
return NewErrorInternalIterator(s, arena);
}
}

if (table_reader_ptr != nullptr) {
*table_reader_ptr = nullptr;
}

bool create_new_table_reader = false;
TableReader* table_reader = nullptr;
Cache::Handle* handle = nullptr;

size_t readahead = 0;
bool create_new_table_reader = false;
if (for_compaction) {
if (ioptions_.new_table_reader_for_compaction_inputs) {
readahead = ioptions_.compaction_readahead_size;
create_new_table_reader = true;
}
} else {
readahead = options.readahead_size;
create_new_table_reader = readahead > 0;
}

if (create_new_table_reader) {
unique_ptr<TableReader> table_reader_unique_ptr;
Status s = GetTableReader(
env_options, icomparator, fd, true /* sequential_mode */, readahead,
!for_compaction /* record stats */, nullptr, &table_reader_unique_ptr,
false /* skip_filters */, level);
if (!s.ok()) {
return NewErrorInternalIterator(s, arena);
if (s.ok()) {
if (table_reader_ptr != nullptr) {
*table_reader_ptr = nullptr;
}
table_reader = table_reader_unique_ptr.release();
} else {
table_reader = fd.table_reader;
if (table_reader == nullptr) {
Status s = FindTable(env_options, icomparator, fd, &handle,
options.read_tier == kBlockCacheTier /* no_io */,
!for_compaction /* record read_stats */,
file_read_hist, skip_filters, level);
if (!s.ok()) {
return NewErrorInternalIterator(s, arena);
size_t readahead = 0;
if (for_compaction) {
if (ioptions_.new_table_reader_for_compaction_inputs) {
readahead = ioptions_.compaction_readahead_size;
create_new_table_reader = true;
}
table_reader = GetTableReaderFromHandle(handle);
} else {
readahead = options.readahead_size;
create_new_table_reader = readahead > 0;
}
}

InternalIterator* result =
table_reader->NewIterator(options, arena, skip_filters);
if (create_new_table_reader) {
assert(handle == nullptr);
result->RegisterCleanup(&DeleteTableReader, table_reader, nullptr);
} else if (handle != nullptr) {
result->RegisterCleanup(&UnrefEntry, cache_, handle);
if (create_new_table_reader) {
unique_ptr<TableReader> table_reader_unique_ptr;
s = GetTableReader(
env_options, icomparator, fd, true /* sequential_mode */, readahead,
!for_compaction /* record stats */, nullptr, &table_reader_unique_ptr,
false /* skip_filters */, level);
if (s.ok()) {
table_reader = table_reader_unique_ptr.release();
}
} else {
table_reader = fd.table_reader;
if (table_reader == nullptr) {
s = FindTable(env_options, icomparator, fd, &handle,
options.read_tier == kBlockCacheTier /* no_io */,
!for_compaction /* record read_stats */, file_read_hist,
skip_filters, level);
if (s.ok()) {
table_reader = GetTableReaderFromHandle(handle);
}
}
}
}
if (s.ok()) {
InternalIterator* result =
table_reader->NewIterator(options, arena, skip_filters);
if (create_new_table_reader) {
assert(handle == nullptr);
result->RegisterCleanup(&DeleteTableReader, table_reader, nullptr);
} else if (handle != nullptr) {
result->RegisterCleanup(&UnrefEntry, cache_, handle);
}

if (for_compaction) {
table_reader->SetupForCompaction();
if (for_compaction) {
table_reader->SetupForCompaction();
}
if (table_reader_ptr != nullptr) {
*table_reader_ptr = table_reader;
}
return result;
}
if (table_reader_ptr != nullptr) {
*table_reader_ptr = table_reader;
if (handle != nullptr) {
ReleaseHandle(handle);
}
return result;
return NewErrorInternalIterator(s);
}

InternalIterator* TableCache::NewRangeDeletionIterator(
Expand Down Expand Up @@ -281,89 +282,87 @@ Status TableCache::Get(const ReadOptions& options,
const FileDescriptor& fd, const Slice& k,
GetContext* get_context, HistogramImpl* file_read_hist,
bool skip_filters, int level) {
Status s;
if (get_context->range_del_agg() != nullptr &&
!options.ignore_range_deletions) {
std::unique_ptr<InternalIterator> range_del_iter(NewRangeDeletionIterator(
options, internal_comparator, fd, file_read_hist, skip_filters, level));
Status s = range_del_iter->status();
s = range_del_iter->status();
if (s.ok()) {
s = get_context->range_del_agg()->AddTombstones(
std::move(range_del_iter));
}
if (!s.ok()) {
return s;
}
}

TableReader* t = fd.table_reader;
Status s;
Cache::Handle* handle = nullptr;
std::string* row_cache_entry = nullptr;

bool done = false;
#ifndef ROCKSDB_LITE
IterKey row_cache_key;
std::string row_cache_entry_buffer;

// Check row cache if enabled. Since row cache does not currently store
// sequence numbers, we cannot use it if we need to fetch the sequence.
if (ioptions_.row_cache && !get_context->NeedToReadSequence()) {
uint64_t fd_number = fd.GetNumber();
auto user_key = ExtractUserKey(k);
// We use the user key as cache key instead of the internal key,
// otherwise the whole cache would be invalidated every time the
// sequence key increases. However, to support caching snapshot
// reads, we append the sequence number (incremented by 1 to
// distinguish from 0) only in this case.
uint64_t seq_no =
options.snapshot == nullptr ? 0 : 1 + GetInternalKeySeqno(k);

// Compute row cache key.
row_cache_key.TrimAppend(row_cache_key.Size(), row_cache_id_.data(),
row_cache_id_.size());
AppendVarint64(&row_cache_key, fd_number);
AppendVarint64(&row_cache_key, seq_no);
row_cache_key.TrimAppend(row_cache_key.Size(), user_key.data(),
user_key.size());

if (auto row_handle = ioptions_.row_cache->Lookup(row_cache_key.GetKey())) {
auto found_row_cache_entry = static_cast<const std::string*>(
ioptions_.row_cache->Value(row_handle));
replayGetContextLog(*found_row_cache_entry, user_key, get_context);
ioptions_.row_cache->Release(row_handle);
RecordTick(ioptions_.statistics, ROW_CACHE_HIT);
return Status::OK();
if (s.ok()) {
// Check row cache if enabled. Since row cache does not currently store
// sequence numbers, we cannot use it if we need to fetch the sequence.
if (ioptions_.row_cache && !get_context->NeedToReadSequence()) {
uint64_t fd_number = fd.GetNumber();
auto user_key = ExtractUserKey(k);
// We use the user key as cache key instead of the internal key,
// otherwise the whole cache would be invalidated every time the
// sequence key increases. However, to support caching snapshot
// reads, we append the sequence number (incremented by 1 to
// distinguish from 0) only in this case.
uint64_t seq_no =
options.snapshot == nullptr ? 0 : 1 + GetInternalKeySeqno(k);

// Compute row cache key.
row_cache_key.TrimAppend(row_cache_key.Size(), row_cache_id_.data(),
row_cache_id_.size());
AppendVarint64(&row_cache_key, fd_number);
AppendVarint64(&row_cache_key, seq_no);
row_cache_key.TrimAppend(row_cache_key.Size(), user_key.data(),
user_key.size());

if (auto row_handle =
ioptions_.row_cache->Lookup(row_cache_key.GetKey())) {
auto found_row_cache_entry = static_cast<const std::string*>(
ioptions_.row_cache->Value(row_handle));
replayGetContextLog(*found_row_cache_entry, user_key, get_context);
ioptions_.row_cache->Release(row_handle);
RecordTick(ioptions_.statistics, ROW_CACHE_HIT);
done = true;
} else {
// Not found, setting up the replay log.
RecordTick(ioptions_.statistics, ROW_CACHE_MISS);
row_cache_entry = &row_cache_entry_buffer;
}
}

// Not found, setting up the replay log.
RecordTick(ioptions_.statistics, ROW_CACHE_MISS);
row_cache_entry = &row_cache_entry_buffer;
}
#endif // ROCKSDB_LITE

if (!t) {
s = FindTable(env_options_, internal_comparator, fd, &handle,
options.read_tier == kBlockCacheTier /* no_io */,
true /* record_read_stats */, file_read_hist, skip_filters,
level);
if (s.ok()) {
t = GetTableReaderFromHandle(handle);
if (!done && s.ok()) {
if (!t) {
s = FindTable(env_options_, internal_comparator, fd, &handle,
options.read_tier == kBlockCacheTier /* no_io */,
true /* record_read_stats */, file_read_hist, skip_filters,
level);
if (s.ok()) {
t = GetTableReaderFromHandle(handle);
}
}
}
if (s.ok()) {
get_context->SetReplayLog(row_cache_entry); // nullptr if no cache.
s = t->Get(options, k, get_context, skip_filters);
get_context->SetReplayLog(nullptr);
if (handle != nullptr) {
ReleaseHandle(handle);
if (s.ok()) {
get_context->SetReplayLog(row_cache_entry); // nullptr if no cache.
s = t->Get(options, k, get_context, skip_filters);
get_context->SetReplayLog(nullptr);
} else if (options.read_tier == kBlockCacheTier && s.IsIncomplete()) {
// Couldn't find Table in cache but treat as kFound if no_io set
get_context->MarkKeyMayExist();
s = Status::OK();
done = true;
}
} else if (options.read_tier == kBlockCacheTier && s.IsIncomplete()) {
// Couldn't find Table in cache but treat as kFound if no_io set
get_context->MarkKeyMayExist();
return Status::OK();
}

#ifndef ROCKSDB_LITE
// Put the replay log in row cache only if something was found.
if (s.ok() && row_cache_entry && !row_cache_entry->empty()) {
if (!done && s.ok() && row_cache_entry && !row_cache_entry->empty()) {
size_t charge =
row_cache_key.Size() + row_cache_entry->size() + sizeof(std::string);
void* row_ptr = new std::string(std::move(*row_cache_entry));
Expand All @@ -372,6 +371,9 @@ Status TableCache::Get(const ReadOptions& options,
}
#endif // ROCKSDB_LITE

if (handle != nullptr) {
ReleaseHandle(handle);
}
return s;
}

Expand Down