Skip to content

Commit

Permalink
wip - make check passes
Browse files Browse the repository at this point in the history
  • Loading branch information
udi-speedb committed Sep 18, 2023
1 parent 9f60375 commit 4472a17
Show file tree
Hide file tree
Showing 21 changed files with 103 additions and 45 deletions.
12 changes: 8 additions & 4 deletions db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ CompactionJob::CompactionJob(
output_directory_(output_directory),
stats_(stats),
bottommost_level_(false),
last_level_with_data_(false),
write_hint_(Env::WLTH_NOT_SET),
compaction_job_stats_(compaction_job_stats),
job_id_(job_id),
Expand Down Expand Up @@ -253,12 +254,14 @@ void CompactionJob::Prepare() {
// Generate file_levels_ for compaction before making Iterator
auto* c = compact_->compaction;
ColumnFamilyData* cfd = c->column_family_data();
auto vstorage = cfd->current()->storage_info();
assert(cfd != nullptr);
assert(cfd->current()->storage_info()->NumLevelFiles(
compact_->compaction->level()) > 0);
assert(vstorage->NumLevelFiles(compact_->compaction->level()) > 0);

write_hint_ = cfd->CalculateSSTWriteHint(c->output_level());
bottommost_level_ = c->bottommost_level();
last_level_with_data_ =
(vstorage->GetLastLevelWithData() == c->output_level());

if (c->ShouldFormSubcompactions()) {
StopWatch sw(db_options_.clock, stats_, SUBCOMPACTION_SETUP_TIME);
Expand Down Expand Up @@ -1879,8 +1882,9 @@ Status CompactionJob::OpenCompactionOutputFile(SubcompactionState* sub_compact,
sub_compact->compaction->output_compression(),
sub_compact->compaction->output_compression_opts(), cfd->GetID(),
cfd->GetName(), sub_compact->compaction->output_level(),
bottommost_level_, TableFileCreationReason::kCompaction,
0 /* oldest_key_time */, current_time, db_id_, db_session_id_,
bottommost_level_, last_level_with_data_,
TableFileCreationReason::kCompaction, 0 /* oldest_key_time */,
current_time, db_id_, db_session_id_,
sub_compact->compaction->max_output_file_size(), file_number);

outputs.NewBuilder(tboptions);
Expand Down
3 changes: 2 additions & 1 deletion db/compaction/compaction_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ class CompactionJob {
Statistics* stats_;
// Is this compaction creating a file in the bottom most level?
bool bottommost_level_;

// Is this compaction creating a file in the last level with data?
bool last_level_with_data_ = false;
Env::WriteLifeTimeHint write_hint_;

IOStatus io_status_;
Expand Down
1 change: 1 addition & 0 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1544,6 +1544,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
mutable_cf_options.compression_opts, cfd->GetID(), cfd->GetName(),
0 /* level */, false /* is_bottommost */,
false /* is_last_level_with_data */,
TableFileCreationReason::kRecovery, 0 /* oldest_key_time */,
0 /* file_creation_time */, db_id_, db_session_id_,
0 /* target_file_size */, meta.fd.GetNumber());
Expand Down
1 change: 1 addition & 0 deletions db/external_sst_file_ingestion_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,7 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
env_options_, cfd_->internal_comparator(),
/*skip_filters*/ false, /*immortal*/ false,
/*force_direct_prefetch*/ false, /*level*/ -1, /*bottommost*/ false,
/*last_level_with_data*/ false,
/*block_cache_tracer*/ nullptr,
/*max_file_size_for_l0_meta_pin*/ 0, versions_->DbSessionId(),
/*cur_file_num*/ new_file_number),
Expand Down
6 changes: 3 additions & 3 deletions db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -923,9 +923,9 @@ Status FlushJob::WriteLevel0Table() {
cfd_->int_tbl_prop_collector_factories(), output_compression_,
mutable_cf_options_.compression_opts, cfd_->GetID(), cfd_->GetName(),
0 /* level */, false /* is_bottommost */,
TableFileCreationReason::kFlush, oldest_key_time, current_time,
db_id_, db_session_id_, 0 /* target_file_size */,
meta_.fd.GetNumber());
false /* is_last_level_with_data */, TableFileCreationReason::kFlush,
oldest_key_time, current_time, db_id_, db_session_id_,
0 /* target_file_size */, meta_.fd.GetNumber());
const SequenceNumber job_snapshot_seq =
job_context_->GetJobSnapshotSequence();
s = BuildTable(
Expand Down
1 change: 1 addition & 0 deletions db/import_column_family_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ Status ImportColumnFamilyJob::GetIngestedFileInfo(
env_options_, cfd_->internal_comparator(),
/*skip_filters*/ false, /*immortal*/ false,
/*force_direct_prefetch*/ false, /*level*/ -1, /*bottommost*/ false,
/*last_level_with_data*/ false,
/*block_cache_tracer*/ nullptr,
/*max_file_size_for_l0_meta_pin*/ 0, versions_->DbSessionId(),
/*cur_file_num*/ new_file_number),
Expand Down
1 change: 1 addition & 0 deletions db/repair.cc
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,7 @@ class Repairer {
cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(),
kNoCompression, default_compression, cfd->GetID(), cfd->GetName(),
-1 /* level */, false /* is_bottommost */,
false /* is_last_level_with_data */,
TableFileCreationReason::kRecovery, 0 /* oldest_key_time */,
0 /* file_creation_time */, "DB Repairer" /* db_id */, db_session_id_,
0 /*target_file_size*/, meta.fd.GetNumber());
Expand Down
16 changes: 11 additions & 5 deletions db/table_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,17 @@ TableCache::TableCache(const ImmutableOptions& ioptions,
const FileOptions* file_options, Cache* const cache,
BlockCacheTracer* const block_cache_tracer,
const std::shared_ptr<IOTracer>& io_tracer,
const std::string& db_session_id)
const std::string& db_session_id,
GetLastLevelWithDataFunc get_last_leve_with_data_func)
: ioptions_(ioptions),
file_options_(*file_options),
cache_(cache),
immortal_tables_(false),
block_cache_tracer_(block_cache_tracer),
loader_mutex_(kLoadConcurency, kGetSliceNPHash64UnseededFnPtr),
io_tracer_(io_tracer),
db_session_id_(db_session_id) {
db_session_id_(db_session_id),
get_last_leve_with_data_func_(get_last_leve_with_data_func) {
if (ioptions_.row_cache) {
// If the same cache is shared by multiple instances, we need to
// disambiguate its entries.
Expand Down Expand Up @@ -138,12 +140,16 @@ Status TableCache::GetTableReader(
expected_unique_id = kNullUniqueId64x2; // null ID == no verification
}

auto is_last_level_with_data = is_bottom;
if (get_last_leve_with_data_func_) {
is_last_level_with_data = get_last_leve_with_data_func_();
}
TableReaderOptions table_reader_options(
ioptions_, prefix_extractor, file_options, internal_comparator,
skip_filters, immortal_tables_, false /* force_direct_prefetch */,
level, is_bottom, block_cache_tracer_, max_file_size_for_l0_meta_pin,
db_session_id_, file_meta.fd.GetNumber(), expected_unique_id,
file_meta.fd.largest_seqno);
level, is_bottom, is_last_level_with_data, block_cache_tracer_,
max_file_size_for_l0_meta_pin, db_session_id_, file_meta.fd.GetNumber(),
expected_unique_id, file_meta.fd.largest_seqno);
table_reader_options.cache_owner_id = cache_owner_id_;

s = ioptions_.table_factory->NewTableReader(
Expand Down
8 changes: 7 additions & 1 deletion db/table_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#pragma once
#include <cstdint>
#include <functional>
#include <string>
#include <vector>

Expand Down Expand Up @@ -49,12 +50,16 @@ class HistogramImpl;
// cache, lookup is very fast. The row cache is obtained from
// ioptions.row_cache
class TableCache {
public:
using GetLastLevelWithDataFunc = std::function<int()>;

public:
TableCache(const ImmutableOptions& ioptions,
const FileOptions* storage_options, Cache* cache,
BlockCacheTracer* const block_cache_tracer,
const std::shared_ptr<IOTracer>& io_tracer,
const std::string& db_session_id);
const std::string& db_session_id,
GetLastLevelWithDataFunc get_last_leve_with_data_func = nullptr);
~TableCache();

// Cache interface for table cache
Expand Down Expand Up @@ -273,6 +278,7 @@ class TableCache {
std::shared_ptr<IOTracer> io_tracer_;
std::string db_session_id_;
Cache::ItemOwnerId cache_owner_id_ = Cache::kUnknownItemOwnerId;
GetLastLevelWithDataFunc get_last_leve_with_data_func_;
};

} // namespace ROCKSDB_NAMESPACE
12 changes: 12 additions & 0 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4280,6 +4280,18 @@ void VersionStorageInfo::GetOverlappingInputsRangeBinarySearch(
}
}

int VersionStorageInfo::GetLastLevelWithData() const {
// By default, it's the last possible level
int last_level_with_data = num_levels() - 1;

// Now scan upwards, and find the last level that has data
while ((last_level_with_data > 1) &&
LevelFiles(last_level_with_data).empty()) {
--last_level_with_data;
}
return last_level_with_data;
}

uint64_t VersionStorageInfo::NumLevelBytes(int level) const {
assert(level >= 0);
assert(level < num_levels());
Expand Down
6 changes: 6 additions & 0 deletions db/version_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,12 @@ class VersionStorageInfo {
return num_non_empty_levels_;
}

// Finds the last level that contains data.
// This level is usually not the same as the last potential level as
// configured by the AdvancedColumnFamilyOptions's num_levels option.
// REQUIRES: This version has been saved (see VersionBuilder::SaveTo)
int GetLastLevelWithData() const;

// REQUIRES: PrepareForVersionAppend has been called
// This may or may not return number of level files. It is to keep backward
// compatible behavior in universal compaction.
Expand Down
8 changes: 4 additions & 4 deletions include/rocksdb/table_pinning_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ struct ConfigOptions;
struct TablePinningOptions {
TablePinningOptions() = default;

TablePinningOptions(int _level, bool _is_bottom, size_t _file_size,
size_t _max_file_size_for_l0_meta_pin)
TablePinningOptions(int _level, bool _is_last_level_with_data,
size_t _file_size, size_t _max_file_size_for_l0_meta_pin)
: level(_level),
is_bottom(_is_bottom),
is_last_level_with_data(_is_last_level_with_data),
file_size(_file_size),
max_file_size_for_l0_meta_pin(_max_file_size_for_l0_meta_pin) {}
int level = -1;
bool is_bottom = false;
bool is_last_level_with_data = false;
size_t file_size = 0;
size_t max_file_size_for_l0_meta_pin = 0;
};
Expand Down
10 changes: 6 additions & 4 deletions plugin/speedb/pinning_policy/scoped_pinning_policy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ static std::unordered_map<std::string, OptionTypeInfo>
{"capacity",
{offsetof(struct ScopedPinningOptions, capacity), OptionType::kSizeT,
OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
{"bottom_percent",
{offsetof(struct ScopedPinningOptions, bottom_percent),
{"last_level_with_data_percent",
{offsetof(struct ScopedPinningOptions, last_level_with_data_percent),
OptionType::kUInt32T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"mid_percent",
Expand All @@ -53,8 +53,10 @@ bool ScopedPinningPolicy::CheckPin(const TablePinningOptions& tpo,
uint8_t /* type */, size_t size,
size_t usage) const {
auto proposed = usage + size;
if (tpo.is_bottom && options_.bottom_percent > 0) {
if (proposed > (options_.capacity * options_.bottom_percent / 100)) {
if (tpo.is_last_level_with_data &&
options_.last_level_with_data_percent > 0) {
if (proposed >
(options_.capacity * options_.last_level_with_data_percent / 100)) {
return false;
}
} else if (tpo.level > 0 && options_.mid_percent > 0) {
Expand Down
4 changes: 2 additions & 2 deletions plugin/speedb/pinning_policy/scoped_pinning_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ struct ScopedPinningOptions {
// Limit to how much data should be pinned
size_t capacity = 1024 * 1024 * 1024; // 1GB

// Percent of capacity at which not to pin bottom-most data
uint32_t bottom_percent = 10;
// Percent of capacity at which not to pin last-leve-with-data data
uint32_t last_level_with_data_percent = 10;
// Percent of capacity at which not to pin non-L0 data
uint32_t mid_percent = 80;
};
Expand Down
21 changes: 13 additions & 8 deletions plugin/speedb/pinning_policy/scoped_pinning_policy_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,19 @@ TEST_F(ScopedPinningPolicyTest, GetOptions) {
auto opts = policy->GetOptions<ScopedPinningOptions>();
ASSERT_NE(opts, nullptr);
ASSERT_EQ(opts->capacity, ScopedPinningOptions().capacity);
ASSERT_EQ(opts->bottom_percent, ScopedPinningOptions().bottom_percent);
ASSERT_EQ(opts->last_level_with_data_percent,
ScopedPinningOptions().last_level_with_data_percent);
ASSERT_EQ(opts->mid_percent, ScopedPinningOptions().mid_percent);
ASSERT_TRUE(policy->IsInstanceOf(ScopedPinningPolicy::kClassName()));

ASSERT_OK(TablePinningPolicy::CreateFromString(
cfg, id + "; capacity=2048; bottom_percent=22; mid_percent=33", &policy));
cfg,
id + "; capacity=2048; last_level_with_data_percent=22; mid_percent=33",
&policy));
opts = policy->GetOptions<ScopedPinningOptions>();
ASSERT_NE(opts, nullptr);
ASSERT_EQ(opts->capacity, 2048);
ASSERT_EQ(opts->bottom_percent, 22);
ASSERT_EQ(opts->last_level_with_data_percent, 22);
ASSERT_EQ(opts->mid_percent, 33);
ASSERT_TRUE(policy->IsInstanceOf(ScopedPinningPolicy::kClassName()));
}
Expand All @@ -85,11 +88,13 @@ TEST_F(ScopedPinningPolicyTest, GetManaged) {

std::string id = std::string("id=") + ScopedPinningPolicy::kClassName();
ASSERT_OK(TablePinningPolicy::CreateFromString(
cfg, id + "; capacity=2048; bottom_percent=22; mid_percent=33", &policy));
cfg,
id + "; capacity=2048; last_level_with_data_percent=22; mid_percent=33",
&policy));
auto opts = policy->GetOptions<ScopedPinningOptions>();
ASSERT_NE(opts, nullptr);
ASSERT_EQ(opts->capacity, 2048);
ASSERT_EQ(opts->bottom_percent, 22);
ASSERT_EQ(opts->last_level_with_data_percent, 22);
ASSERT_EQ(opts->mid_percent, 33);
ASSERT_TRUE(policy->IsInstanceOf(ScopedPinningPolicy::kClassName()));
std::shared_ptr<TablePinningPolicy> copy;
Expand All @@ -99,13 +104,13 @@ TEST_F(ScopedPinningPolicyTest, GetManaged) {
ASSERT_OK(TablePinningPolicy::CreateFromString(
cfg,
"id= " + policy->GetId() +
"; capacity=4096; bottom_percent=11; mid_percent=44",
"; capacity=4096; last_level_with_data_percent=11; mid_percent=44",
&copy));
ASSERT_EQ(copy, policy);
opts = policy->GetOptions<ScopedPinningOptions>();
ASSERT_NE(opts, nullptr);
ASSERT_EQ(opts->capacity, 2048);
ASSERT_EQ(opts->bottom_percent, 22);
ASSERT_EQ(opts->last_level_with_data_percent, 22);
ASSERT_EQ(opts->mid_percent, 33);
}

Expand All @@ -114,7 +119,7 @@ TEST_F(ScopedPinningPolicyTest, TestLimits) {
auto opts = policy->GetOptions<ScopedPinningOptions>();
ASSERT_NE(opts, nullptr);
auto capacity = opts->capacity;
size_t bottom = capacity * opts->bottom_percent / 100;
size_t bottom = capacity * opts->last_level_with_data_percent / 100;
size_t mid = capacity * opts->mid_percent / 100;

TablePinningOptions l0(0, false, 0, 0); // Level 0
Expand Down
4 changes: 2 additions & 2 deletions table/block_based/block_based_table_factory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -579,8 +579,8 @@ Status BlockBasedTableFactory::NewTableReader(
file_size, table_reader, table_reader_cache_res_mgr_,
table_reader_options.prefix_extractor, prefetch_index_and_filter_in_cache,
table_reader_options.skip_filters, table_reader_options.level,
table_reader_options.is_bottommost, table_reader_options.immortal,
table_reader_options.largest_seqno,
table_reader_options.is_last_level_with_data,
table_reader_options.immortal, table_reader_options.largest_seqno,
table_reader_options.force_direct_prefetch, &tail_prefetch_stats_,
table_reader_options.block_cache_tracer,
table_reader_options.max_file_size_for_l0_meta_pin,
Expand Down
4 changes: 2 additions & 2 deletions table/block_based/block_based_table_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ Status BlockBasedTable::Open(
std::shared_ptr<CacheReservationManager> table_reader_cache_res_mgr,
const std::shared_ptr<const SliceTransform>& prefix_extractor,
const bool prefetch_index_and_filter_in_cache, const bool skip_filters,
const int level, bool is_bottom, const bool immortal_table,
const int level, bool is_last_level_with_data, const bool immortal_table,
const SequenceNumber largest_seqno, const bool force_direct_prefetch,
TailPrefetchStats* tail_prefetch_stats,
BlockCacheTracer* const block_cache_tracer,
Expand Down Expand Up @@ -765,7 +765,7 @@ Status BlockBasedTable::Open(
if (!s.ok()) {
return s;
}
TablePinningOptions tpo(level, is_bottom, file_size,
TablePinningOptions tpo(level, is_last_level_with_data, file_size,
max_file_size_for_l0_meta_pin);
s = new_table->PrefetchIndexAndFilterBlocks(
ro, prefetch_buffer.get(), metaindex_iter.get(), new_table.get(),
Expand Down
4 changes: 2 additions & 2 deletions table/block_based/block_based_table_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ class BlockBasedTable : public TableReader {
nullptr,
const std::shared_ptr<const SliceTransform>& prefix_extractor = nullptr,
bool prefetch_index_and_filter_in_cache = true, bool skip_filters = false,
int level = -1, bool is_bottom = false, const bool immortal_table = false,
const SequenceNumber largest_seqno = 0,
int level = -1, bool is_last_level_with_data = false,
const bool immortal_table = false, const SequenceNumber largest_seqno = 0,
bool force_direct_prefetch = false,
TailPrefetchStats* tail_prefetch_stats = nullptr,
BlockCacheTracer* const block_cache_tracer = nullptr,
Expand Down
7 changes: 4 additions & 3 deletions table/sst_file_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -317,9 +317,10 @@ Status SstFileWriter::Open(const std::string& file_path) {
r->ioptions, r->mutable_cf_options, r->internal_comparator,
&int_tbl_prop_collector_factories, compression_type, compression_opts,
cf_id, r->column_family_name, unknown_level, false /* is_bottommost */,
TableFileCreationReason::kMisc, 0 /* oldest_key_time */,
0 /* file_creation_time */, "SST Writer" /* db_id */, r->db_session_id,
0 /* target_file_size */, r->next_file_number);
false /* is_last_level_with_data */, TableFileCreationReason::kMisc,
0 /* oldest_key_time */, 0 /* file_creation_time */,
"SST Writer" /* db_id */, r->db_session_id, 0 /* target_file_size */,
r->next_file_number);
// External SST files used to each get a unique session id. Now for
// slightly better uniqueness probability in constructing cache keys, we
// assign fake file numbers to each file (into table properties) and keep
Expand Down
Loading

0 comments on commit 4472a17

Please sign in to comment.