Skip to content

Commit

Permalink
spdb memtable: fix use_seek_parallel_threshold handleding (#603)
Browse files Browse the repository at this point in the history
  • Loading branch information
ayulas authored and udi-speedb committed Nov 21, 2023
1 parent 1c6644f commit c486d55
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 30 deletions.
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

### Bug Fixes
* unit tests: fix GlobalWriteControllerTest.GlobalAndWBMSetupDelay by waiting for the memtable memory release.
* spdb memtable: use_seek_parallel_threshold option parameter mishandled (#570)

## Fig v2.5.0 (06/14/2023)
Based on RocksDB 8.1.1
Expand Down
54 changes: 29 additions & 25 deletions plugin/speedb/memtable/hash_spd_rep.cc
Original file line number Diff line number Diff line change
Expand Up @@ -401,10 +401,10 @@ void SpdbVectorContainer::SortThread() {
class HashSpdRep : public MemTableRep {
public:
HashSpdRep(const MemTableRep::KeyComparator& compare, Allocator* allocator,
size_t bucket_size, bool use_seek_parralel_threshold = false);
size_t bucket_size, bool use_seek_parallel_threshold = false);

HashSpdRep(Allocator* allocator, size_t bucket_size,
bool use_seek_parralel_threshold = false);
bool use_seek_parallel_threshold = false);
void PostCreate(const MemTableRep::KeyComparator& compare,
Allocator* allocator);

Expand Down Expand Up @@ -445,22 +445,22 @@ class HashSpdRep : public MemTableRep {

private:
SpdbHashTable spdb_hash_table_;
bool use_seek_parralel_threshold_ = false;
bool use_seek_parallel_threshold_ = false;
std::shared_ptr<SpdbVectorContainer> spdb_vectors_cont_ = nullptr;
};

HashSpdRep::HashSpdRep(const MemTableRep::KeyComparator& compare,
Allocator* allocator, size_t bucket_size,
bool use_seek_parralel_threshold)
: HashSpdRep(allocator, bucket_size, use_seek_parralel_threshold) {
bool use_seek_parallel_threshold)
: HashSpdRep(allocator, bucket_size, use_seek_parallel_threshold) {
spdb_vectors_cont_ = std::make_shared<SpdbVectorContainer>(compare);
}

HashSpdRep::HashSpdRep(Allocator* allocator, size_t bucket_size,
bool use_seek_parralel_threshold)
bool use_seek_parallel_threshold)
: MemTableRep(allocator),
spdb_hash_table_(bucket_size),
use_seek_parralel_threshold_(use_seek_parralel_threshold) {}
use_seek_parallel_threshold_(use_seek_parallel_threshold) {}

void HashSpdRep::PostCreate(const MemTableRep::KeyComparator& compare,
Allocator* allocator) {
Expand Down Expand Up @@ -525,7 +525,7 @@ void HashSpdRep::Get(const LookupKey& k, void* callback_args,
MemTableRep::Iterator* HashSpdRep::GetIterator(Arena* arena) {
const bool empty_iter =
spdb_vectors_cont_->IsEmpty() ||
(use_seek_parralel_threshold_ && !spdb_vectors_cont_->IsReadOnly());
(use_seek_parallel_threshold_ && !spdb_vectors_cont_->IsReadOnly());
if (arena != nullptr) {
void* mem;
if (empty_iter) {
Expand All @@ -543,32 +543,36 @@ MemTableRep::Iterator* HashSpdRep::GetIterator(Arena* arena) {
}
}

static std::unordered_map<std::string, OptionTypeInfo> hash_spd_factory_info = {

{"hash_bucket_count",
{0, OptionType::kSizeT, OptionVerificationType::kNormal,
OptionTypeFlags::kDontSerialize /*Since it is part of the ID*/}},
{"use_seek_parralel_threshold",
{0, OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kDontSerialize /*Since it is part of the ID*/}},
static std::unordered_map<std::string, OptionTypeInfo> hash_spdb_factory_info =
{
{"hash_bucket_count",
{offsetof(struct HashSpdbRepOptions, hash_bucket_count),
OptionType::kSizeT, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"use_seek_parallel_threshold",
{offsetof(struct HashSpdbRepOptions, use_seek_parallel_threshold),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
};
} // namespace

// HashSpdRepFactory

HashSpdRepFactory::HashSpdRepFactory(size_t hash_bucket_count)
: bucket_count_(hash_bucket_count), use_seek_parralel_threshold_(false) {
HashSpdRepFactory::HashSpdRepFactory(size_t hash_bucket_count) {
options_.hash_bucket_count = hash_bucket_count;
options_.use_seek_parallel_threshold = false;

if (hash_bucket_count == 0) {
use_seek_parralel_threshold_ = true;
bucket_count_ = 1000000;
options_.use_seek_parallel_threshold = true;
options_.hash_bucket_count = 1000000;
}
RegisterOptions("", &bucket_count_, &hash_spd_factory_info);
RegisterOptions(&options_, &hash_spdb_factory_info);
Init();
}

MemTableRep* HashSpdRepFactory::PreCreateMemTableRep() {
MemTableRep* hash_spd =
new HashSpdRep(nullptr, bucket_count_, use_seek_parralel_threshold_);
MemTableRep* hash_spd = new HashSpdRep(nullptr, options_.hash_bucket_count,
options_.use_seek_parallel_threshold);
return hash_spd;
}

Expand All @@ -582,8 +586,8 @@ void HashSpdRepFactory::PostCreateMemTableRep(
MemTableRep* HashSpdRepFactory::CreateMemTableRep(
const MemTableRep::KeyComparator& compare, Allocator* allocator,
const SliceTransform* /*transform*/, Logger* /*logger*/) {
return new HashSpdRep(compare, allocator, bucket_count_,
use_seek_parralel_threshold_);
return new HashSpdRep(compare, allocator, options_.hash_bucket_count,
options_.use_seek_parallel_threshold);
}

} // namespace ROCKSDB_NAMESPACE
11 changes: 8 additions & 3 deletions plugin/speedb/memtable/hash_spd_rep.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,15 @@

namespace ROCKSDB_NAMESPACE {

struct HashSpdbRepOptions {
static const char* kName() { return "HashSpdbRepOptions"; }
size_t hash_bucket_count;
bool use_seek_parallel_threshold;
};

class HashSpdRepFactory : public MemTableRepFactory {
public:
explicit HashSpdRepFactory(size_t bucket_count = 1000000);
explicit HashSpdRepFactory(size_t hash_bucket_count = 1000000);

using MemTableRepFactory::CreateMemTableRep;
MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator& compare,
Expand All @@ -44,8 +50,7 @@ class HashSpdRepFactory : public MemTableRepFactory {
const char* Name() const override { return kClassName(); }

private:
size_t bucket_count_;
bool use_seek_parralel_threshold_ = false;
HashSpdbRepOptions options_;
};

} // namespace ROCKSDB_NAMESPACE
6 changes: 4 additions & 2 deletions tools/db_bench_tool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1864,9 +1864,11 @@ DEFINE_int64(multiread_stride, 0,
DEFINE_bool(multiread_batched, false, "Use the new MultiGet API");

DEFINE_string(memtablerep, "speedb.HashSpdRepFactory", "");

DEFINE_int64(hash_bucket_count, 1000000, "hash bucket count");
DEFINE_bool(use_seek_parralel_threshold, true,
"if use seek parralel threshold .");
DEFINE_bool(use_seek_parallel_threshold, true,
"if use seek parallel threshold .");

DEFINE_bool(use_plain_table, false,
"if use plain table instead of block-based table format");
DEFINE_bool(use_cuckoo_table, false, "if use cuckoo table format");
Expand Down

0 comments on commit c486d55

Please sign in to comment.