Skip to content

Commit

Permalink
hash spd memtable
Browse files Browse the repository at this point in the history
  • Loading branch information
ayulas committed Sep 28, 2022
1 parent 56b1f3d commit 94042b7
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 27 deletions.
65 changes: 41 additions & 24 deletions plugin/speedb/memtable/hash_spd_rep.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ struct BucketHeader {

BucketHeader() { items_.clear(); }

bool InternalContains(const MemTableRep::KeyComparator& comparator,
const char* check_key) {
bool InternalContains(const char* check_key,
const MemTableRep::KeyComparator& comparator) {
if (items_.size() == 0) {
return false;
}
Expand All @@ -102,17 +102,17 @@ struct BucketHeader {
return false;
}

bool Contains(const MemTableRep::KeyComparator& comparator,
const char* check_key) {
bool Contains(const char* check_key,
const MemTableRep::KeyComparator& comparator) {
MutexLock l(&mutex_);
return InternalContains(comparator, check_key);
return InternalContains(check_key, comparator);
}

bool Add(SortItem* sort_item, const MemTableRep::KeyComparator& comparator,
bool check_exist) {
MutexLock l(&mutex_);
if (check_exist) {
if (InternalContains(comparator, sort_item->key_)) return false;
if (InternalContains(sort_item->key_, comparator)) return false;
}

items_.push_front(sort_item);
Expand Down Expand Up @@ -703,19 +703,37 @@ class HashLocklessRep : public MemTableRep {

std::shared_ptr<SortVectorContainer> sort_vectors_cont_;

size_t GetHash(const char* key) const {
Slice slice = UserKey(key);
return MurmurHash(slice.data(), static_cast<int>(slice.size()), 0) %
bucket_size_;
static size_t GetHash(const Slice& user_key_without_ts) {
return MurmurHash(user_key_without_ts.data(),
static_cast<int>(user_key_without_ts.size()), 0);
}
bool InsertWithCheck(KeyHandle handle);

BucketHeader* GetBucket(size_t i) const { return &buckets_.get()[i]; }
static Slice UserKeyWithoutTimestamp(
const Slice internal_key, const MemTableRep::KeyComparator& compare) {
#if defined(ROCKSDB_USE_RTTI) && !defined(NDEBUG)
auto key_comparator =
dynamic_cast<const MemTable::KeyComparator*>(&compare);
assert(key_comparator != nullptr);
#else
auto key_comparator = static_cast<const MemTable::KeyComparator*>(&compare);
#endif
const Comparator* user_comparator =
key_comparator->comparator.user_comparator();
const size_t ts_sz = user_comparator->timestamp_size();
return ExtractUserKeyAndStripTimestamp(internal_key, ts_sz);
}

BucketHeader* GetBucket(const char* key) const {
return GetBucket(GetHash(key));
BucketHeader* GetBucket(const char* key,
const MemTableRep::KeyComparator& comparator) const {
const size_t hash = GetHash(
UserKeyWithoutTimestamp(comparator.decode_key(key), comparator));
BucketHeader* bucket =
const_cast<BucketHeader*>(&buckets_[hash % bucket_size_]);
return bucket;
}

bool InsertWithCheck(KeyHandle handle);

class Iterator : public MemTableRep::Iterator {
public:
// Initialize an iterator over the specified list.
Expand All @@ -725,10 +743,9 @@ class HashLocklessRep : public MemTableRep {
: sort_vectors_cont_(sort_vectors_cont),
iter_heap_info_(comparator),
up_iter_direction_(true) {
IterSortSettingInfo sort_set_info;
sort_vectors_cont_->InitIterator(&sort_set_info);
iter_anchor_ = sort_set_info.iter_anchor_;
iter_sort_items_num_ = sort_set_info.iter_size_;
sort_vectors_cont_->InitIterator(&sort_set_info_);
iter_anchor_ = sort_set_info_.iter_anchor_;
iter_sort_items_num_ = sort_set_info_.iter_size_;
// allocate iter_heap_info
iter_heap_info_.Init(iter_sort_items_num_);
}
Expand Down Expand Up @@ -816,6 +833,7 @@ class HashLocklessRep : public MemTableRep {
uint32_t iter_sort_items_num_;
IterHeapInfo iter_heap_info_;
bool up_iter_direction_;
IterSortSettingInfo sort_set_info_;

protected:
std::string tmp_; // For passing to EncodeKey
Expand Down Expand Up @@ -879,7 +897,7 @@ KeyHandle HashLocklessRep::Allocate(const size_t len, char** buf) {

void HashLocklessRep::Insert(KeyHandle handle) {
SortItem* sort_item = static_cast<SortItem*>(handle);
BucketHeader* bucket = GetBucket(sort_item->key_);
BucketHeader* bucket = GetBucket(sort_item->key_, this->compare_);
bucket->Add(sort_item, this->compare_, false);
// insert to later sorter list
sort_vectors_cont_->Insert(sort_item);
Expand All @@ -889,7 +907,7 @@ void HashLocklessRep::Insert(KeyHandle handle) {

bool HashLocklessRep::InsertWithCheck(KeyHandle handle) {
SortItem* sort_item = static_cast<SortItem*>(handle);
BucketHeader* bucket = GetBucket(sort_item->key_);
BucketHeader* bucket = GetBucket(sort_item->key_, this->compare_);

if (!bucket->Add(sort_item, this->compare_, true)) {
return false;
Expand Down Expand Up @@ -920,9 +938,9 @@ bool HashLocklessRep::InsertKeyConcurrently(KeyHandle handle) {
}

bool HashLocklessRep::Contains(const char* key) const {
BucketHeader* bucket = GetBucket(key);
BucketHeader* bucket = GetBucket(key, this->compare_);

return bucket->Contains(this->compare_, key);
return bucket->Contains(key, this->compare_);
}

void HashLocklessRep::MarkReadOnly() { sort_vectors_cont_->Immutable(); }
Expand All @@ -934,7 +952,7 @@ size_t HashLocklessRep::ApproximateMemoryUsage() {

void HashLocklessRep::Get(const LookupKey& k, void* callback_args,
bool (*callback_func)(void* arg, const char* entry)) {
BucketHeader* bucket = GetBucket(k.memtable_key().data());
BucketHeader* bucket = GetBucket(k.memtable_key().data(), this->compare_);
MutexLock l(&bucket->mutex_);

for (auto iter = bucket->items_.begin(); iter != bucket->items_.end();
Expand All @@ -947,7 +965,6 @@ void HashLocklessRep::Get(const LookupKey& k, void* callback_args,

MemTableRep::Iterator* HashLocklessRep::GetIterator(Arena* arena) {
bool empty_iter = (sort_vectors_cont_->items_count_.load() == 0);
if (!sort_vectors_cont_->immutable_.load()) empty_iter = true;
if (arena != nullptr) {
void* mem;
if (empty_iter) {
Expand Down
7 changes: 4 additions & 3 deletions tools/db_crashtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ def random_distribution(cuts_count):
"experimental_mempurge_threshold": lambda: 10.0*random.random(),
"max_background_compactions": 1,
"max_bytes_for_level_base": 67108864,
"memtablerep": "skip_list",
"memtablerep": lambda: random.choice(["skip_list", "speedb.HashSpdRepFactory"]),
"target_file_size_base": 16777216,
"target_file_size_multiplier": 1,
"test_batches_snapshots": 0,
Expand Down Expand Up @@ -537,8 +537,9 @@ def finalize_and_sanitize(src_params, counter):
if dest_params.get("compression_type") != "zstd":
dest_params["compression_zstd_max_train_bytes"] = 0
if dest_params.get("allow_concurrent_memtable_write", 1) == 1:
# TODO: yuval- add hash_spd memtable
dest_params["memtablerep"] = "skip_list"
dest_params["memtablerep"] = random.choice(
["skip_list", "speedb.HashSpdRepFactory"]
)
if dest_params["mmap_read"] == 1:
dest_params["use_direct_io_for_flush_and_compaction"] = 0
dest_params["use_direct_reads"] = 0
Expand Down

0 comments on commit 94042b7

Please sign in to comment.