Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

570 spdb memtable use seek parralel threshold mishandled #603

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

### Bug Fixes
* unit tests: fix GlobalWriteControllerTest.GlobalAndWBMSetupDelay by waiting for the memtable memory release.
* spdb memtable: use_seek_parallel_threshold option parameter mishandled (#570)

## Fig v2.5.0 (06/14/2023)
Based on RocksDB 8.1.1
Expand Down
54 changes: 29 additions & 25 deletions plugin/speedb/memtable/hash_spd_rep.cc
Original file line number Diff line number Diff line change
Expand Up @@ -401,10 +401,10 @@ void SpdbVectorContainer::SortThread() {
class HashSpdRep : public MemTableRep {
public:
HashSpdRep(const MemTableRep::KeyComparator& compare, Allocator* allocator,
size_t bucket_size, bool use_seek_parralel_threshold = false);
size_t bucket_size, bool use_seek_parallel_threshold = false);

HashSpdRep(Allocator* allocator, size_t bucket_size,
bool use_seek_parralel_threshold = false);
bool use_seek_parallel_threshold = false);
void PostCreate(const MemTableRep::KeyComparator& compare,
Allocator* allocator);

Expand Down Expand Up @@ -445,22 +445,22 @@ class HashSpdRep : public MemTableRep {

private:
SpdbHashTable spdb_hash_table_;
bool use_seek_parralel_threshold_ = false;
bool use_seek_parallel_threshold_ = false;
std::shared_ptr<SpdbVectorContainer> spdb_vectors_cont_ = nullptr;
};

HashSpdRep::HashSpdRep(const MemTableRep::KeyComparator& compare,
Allocator* allocator, size_t bucket_size,
bool use_seek_parralel_threshold)
: HashSpdRep(allocator, bucket_size, use_seek_parralel_threshold) {
bool use_seek_parallel_threshold)
: HashSpdRep(allocator, bucket_size, use_seek_parallel_threshold) {
spdb_vectors_cont_ = std::make_shared<SpdbVectorContainer>(compare);
}

HashSpdRep::HashSpdRep(Allocator* allocator, size_t bucket_size,
bool use_seek_parralel_threshold)
bool use_seek_parallel_threshold)
: MemTableRep(allocator),
spdb_hash_table_(bucket_size),
use_seek_parralel_threshold_(use_seek_parralel_threshold) {}
use_seek_parallel_threshold_(use_seek_parallel_threshold) {}

void HashSpdRep::PostCreate(const MemTableRep::KeyComparator& compare,
Allocator* allocator) {
Expand Down Expand Up @@ -525,7 +525,7 @@ void HashSpdRep::Get(const LookupKey& k, void* callback_args,
MemTableRep::Iterator* HashSpdRep::GetIterator(Arena* arena) {
const bool empty_iter =
spdb_vectors_cont_->IsEmpty() ||
(use_seek_parralel_threshold_ && !spdb_vectors_cont_->IsReadOnly());
(use_seek_parallel_threshold_ && !spdb_vectors_cont_->IsReadOnly());
if (arena != nullptr) {
void* mem;
if (empty_iter) {
Expand All @@ -543,32 +543,36 @@ MemTableRep::Iterator* HashSpdRep::GetIterator(Arena* arena) {
}
}

static std::unordered_map<std::string, OptionTypeInfo> hash_spd_factory_info = {

{"hash_bucket_count",
{0, OptionType::kSizeT, OptionVerificationType::kNormal,
OptionTypeFlags::kDontSerialize /*Since it is part of the ID*/}},
{"use_seek_parralel_threshold",
{0, OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kDontSerialize /*Since it is part of the ID*/}},
static std::unordered_map<std::string, OptionTypeInfo> hash_spdb_factory_info =
{
{"hash_bucket_count",
{offsetof(struct HashSpdbRepOptions, hash_bucket_count),
OptionType::kSizeT, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"use_seek_parallel_threshold",
{offsetof(struct HashSpdbRepOptions, use_seek_parallel_threshold),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
};
} // namespace

// HashSpdRepFactory

HashSpdRepFactory::HashSpdRepFactory(size_t hash_bucket_count)
: bucket_count_(hash_bucket_count), use_seek_parralel_threshold_(false) {
HashSpdRepFactory::HashSpdRepFactory(size_t hash_bucket_count) {
options_.hash_bucket_count = hash_bucket_count;
options_.use_seek_parallel_threshold = false;

if (hash_bucket_count == 0) {
use_seek_parralel_threshold_ = true;
bucket_count_ = 1000000;
options_.use_seek_parallel_threshold = true;
options_.hash_bucket_count = 1000000;
}
RegisterOptions("", &bucket_count_, &hash_spd_factory_info);
RegisterOptions(&options_, &hash_spdb_factory_info);
Init();
}

MemTableRep* HashSpdRepFactory::PreCreateMemTableRep() {
MemTableRep* hash_spd =
new HashSpdRep(nullptr, bucket_count_, use_seek_parralel_threshold_);
MemTableRep* hash_spd = new HashSpdRep(nullptr, options_.hash_bucket_count,
options_.use_seek_parallel_threshold);
return hash_spd;
}

Expand All @@ -582,8 +586,8 @@ void HashSpdRepFactory::PostCreateMemTableRep(
MemTableRep* HashSpdRepFactory::CreateMemTableRep(
const MemTableRep::KeyComparator& compare, Allocator* allocator,
const SliceTransform* /*transform*/, Logger* /*logger*/) {
return new HashSpdRep(compare, allocator, bucket_count_,
use_seek_parralel_threshold_);
return new HashSpdRep(compare, allocator, options_.hash_bucket_count,
options_.use_seek_parallel_threshold);
}

} // namespace ROCKSDB_NAMESPACE
11 changes: 8 additions & 3 deletions plugin/speedb/memtable/hash_spd_rep.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,15 @@

namespace ROCKSDB_NAMESPACE {

struct HashSpdbRepOptions {
static const char* kName() { return "HashSpdbRepOptions"; }
size_t hash_bucket_count;
bool use_seek_parallel_threshold;
};

class HashSpdRepFactory : public MemTableRepFactory {
public:
explicit HashSpdRepFactory(size_t bucket_count = 1000000);
explicit HashSpdRepFactory(size_t hash_bucket_count = 1000000);

using MemTableRepFactory::CreateMemTableRep;
MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator& compare,
Expand All @@ -44,8 +50,7 @@ class HashSpdRepFactory : public MemTableRepFactory {
const char* Name() const override { return kClassName(); }

private:
size_t bucket_count_;
bool use_seek_parralel_threshold_ = false;
HashSpdbRepOptions options_;
};

} // namespace ROCKSDB_NAMESPACE
6 changes: 4 additions & 2 deletions tools/db_bench_tool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1857,9 +1857,11 @@ DEFINE_int64(multiread_stride, 0,
DEFINE_bool(multiread_batched, false, "Use the new MultiGet API");

DEFINE_string(memtablerep, "speedb.HashSpdRepFactory", "");

DEFINE_int64(hash_bucket_count, 1000000, "hash bucket count");
DEFINE_bool(use_seek_parralel_threshold, true,
"if use seek parralel threshold .");
DEFINE_bool(use_seek_parallel_threshold, true,
"if use seek parallel threshold .");

DEFINE_bool(use_plain_table, false,
"if use plain table instead of block-based table format");
DEFINE_bool(use_cuckoo_table, false, "if use cuckoo table format");
Expand Down