Skip to content

Commit

Permalink
hash spd memtable
Browse files Browse the repository at this point in the history
  • Loading branch information
ayulas committed Sep 29, 2022
1 parent 56b1f3d commit 1fae7aa
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 31 deletions.
73 changes: 45 additions & 28 deletions plugin/speedb/memtable/hash_spd_rep.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ struct BucketHeader {

BucketHeader() { items_.clear(); }

bool InternalContains(const MemTableRep::KeyComparator& comparator,
const char* check_key) {
bool InternalContains(const char* check_key,
const MemTableRep::KeyComparator& comparator) {
if (items_.size() == 0) {
return false;
}
Expand All @@ -102,17 +102,17 @@ struct BucketHeader {
return false;
}

bool Contains(const MemTableRep::KeyComparator& comparator,
const char* check_key) {
bool Contains(const char* check_key,
const MemTableRep::KeyComparator& comparator) {
MutexLock l(&mutex_);
return InternalContains(comparator, check_key);
return InternalContains(check_key, comparator);
}

bool Add(SortItem* sort_item, const MemTableRep::KeyComparator& comparator,
bool check_exist) {
MutexLock l(&mutex_);
if (check_exist) {
if (InternalContains(comparator, sort_item->key_)) return false;
if (InternalContains(sort_item->key_, comparator)) return false;
}

items_.push_front(sort_item);
Expand Down Expand Up @@ -456,7 +456,7 @@ class SortVectorContainer {

void Insert(SortItem* new_item);

void InitIterator(IterSortSettingInfo* sort_set_info);
void InitIterator(std::shared_ptr<IterSortSettingInfo> sort_set_info);

void SeekIter(std::list<std::shared_ptr<SortVector>>::iterator iter_anchor,
IterHeapInfo* iter_heap_info, const char* seek_key,
Expand Down Expand Up @@ -517,7 +517,7 @@ void SortVectorContainer::Insert(SortItem* new_item) {
}
}

void SortVectorContainer::InitIterator(IterSortSettingInfo* sort_set_info) {
void SortVectorContainer::InitIterator(std::shared_ptr<IterSortSettingInfo> sort_set_info) {
SortItem* sort_item;
bool immutable = false;

Expand All @@ -530,7 +530,7 @@ void SortVectorContainer::InitIterator(IterSortSettingInfo* sort_set_info) {
sort_set_info->iter_sort_vector_ =
std::make_shared<SortVector>(switch_vector_limit_);
sort_item = sort_set_info->iter_sort_vector_->GetIterPoint();
sort_item->SetSortSetInfo(static_cast<void*>(sort_set_info));
sort_item->SetSortSetInfo(sort_set_info.get());
SortItem* prev_item = last_item_.exchange(sort_item);
prev_item->SetNext(sort_item);
{
Expand Down Expand Up @@ -621,7 +621,7 @@ void SortVectorContainer::SortThread() {
AdvanceAndSort(sort_set_info->iter_sort_vector_);
} else {
// need to add to the empty_iter_sort_vectors_
// TBD AYELET
// TBD AYELET - od we really need that?
empty_iter_sort_vectors_.push_back(
std::make_shared<SortVector>(switch_vector_limit_));
}
Expand Down Expand Up @@ -703,19 +703,37 @@ class HashLocklessRep : public MemTableRep {

std::shared_ptr<SortVectorContainer> sort_vectors_cont_;

size_t GetHash(const char* key) const {
Slice slice = UserKey(key);
return MurmurHash(slice.data(), static_cast<int>(slice.size()), 0) %
bucket_size_;
static size_t GetHash(const Slice& user_key_without_ts) {
return MurmurHash(user_key_without_ts.data(),
static_cast<int>(user_key_without_ts.size()), 0);
}
bool InsertWithCheck(KeyHandle handle);

BucketHeader* GetBucket(size_t i) const { return &buckets_.get()[i]; }
static Slice UserKeyWithoutTimestamp(
const Slice internal_key, const MemTableRep::KeyComparator& compare) {
#if defined(ROCKSDB_USE_RTTI) && !defined(NDEBUG)
auto key_comparator =
dynamic_cast<const MemTable::KeyComparator*>(&compare);
assert(key_comparator != nullptr);
#else
auto key_comparator = static_cast<const MemTable::KeyComparator*>(&compare);
#endif
const Comparator* user_comparator =
key_comparator->comparator.user_comparator();
const size_t ts_sz = user_comparator->timestamp_size();
return ExtractUserKeyAndStripTimestamp(internal_key, ts_sz);
}

BucketHeader* GetBucket(const char* key) const {
return GetBucket(GetHash(key));
BucketHeader* GetBucket(const char* key,
const MemTableRep::KeyComparator& comparator) const {
const size_t hash = GetHash(
UserKeyWithoutTimestamp(comparator.decode_key(key), comparator));
BucketHeader* bucket =
const_cast<BucketHeader*>(&buckets_[hash % bucket_size_]);
return bucket;
}

bool InsertWithCheck(KeyHandle handle);

class Iterator : public MemTableRep::Iterator {
public:
// Initialize an iterator over the specified list.
Expand All @@ -725,10 +743,9 @@ class HashLocklessRep : public MemTableRep {
: sort_vectors_cont_(sort_vectors_cont),
iter_heap_info_(comparator),
up_iter_direction_(true) {
IterSortSettingInfo sort_set_info;
sort_vectors_cont_->InitIterator(&sort_set_info);
iter_anchor_ = sort_set_info.iter_anchor_;
iter_sort_items_num_ = sort_set_info.iter_size_;
sort_vectors_cont_->InitIterator(sort_set_info_);
iter_anchor_ = sort_set_info_->iter_anchor_;
iter_sort_items_num_ = sort_set_info_->iter_size_;
// allocate iter_heap_info
iter_heap_info_.Init(iter_sort_items_num_);
}
Expand Down Expand Up @@ -816,6 +833,7 @@ class HashLocklessRep : public MemTableRep {
uint32_t iter_sort_items_num_;
IterHeapInfo iter_heap_info_;
bool up_iter_direction_;
std::shared_ptr<IterSortSettingInfo> sort_set_info_;

protected:
std::string tmp_; // For passing to EncodeKey
Expand Down Expand Up @@ -879,7 +897,7 @@ KeyHandle HashLocklessRep::Allocate(const size_t len, char** buf) {

void HashLocklessRep::Insert(KeyHandle handle) {
SortItem* sort_item = static_cast<SortItem*>(handle);
BucketHeader* bucket = GetBucket(sort_item->key_);
BucketHeader* bucket = GetBucket(sort_item->key_, this->compare_);
bucket->Add(sort_item, this->compare_, false);
// insert to later sorter list
sort_vectors_cont_->Insert(sort_item);
Expand All @@ -889,7 +907,7 @@ void HashLocklessRep::Insert(KeyHandle handle) {

bool HashLocklessRep::InsertWithCheck(KeyHandle handle) {
SortItem* sort_item = static_cast<SortItem*>(handle);
BucketHeader* bucket = GetBucket(sort_item->key_);
BucketHeader* bucket = GetBucket(sort_item->key_, this->compare_);

if (!bucket->Add(sort_item, this->compare_, true)) {
return false;
Expand Down Expand Up @@ -920,9 +938,9 @@ bool HashLocklessRep::InsertKeyConcurrently(KeyHandle handle) {
}

bool HashLocklessRep::Contains(const char* key) const {
BucketHeader* bucket = GetBucket(key);
BucketHeader* bucket = GetBucket(key, this->compare_);

return bucket->Contains(this->compare_, key);
return bucket->Contains(key, this->compare_);
}

void HashLocklessRep::MarkReadOnly() { sort_vectors_cont_->Immutable(); }
Expand All @@ -934,7 +952,7 @@ size_t HashLocklessRep::ApproximateMemoryUsage() {

void HashLocklessRep::Get(const LookupKey& k, void* callback_args,
bool (*callback_func)(void* arg, const char* entry)) {
BucketHeader* bucket = GetBucket(k.memtable_key().data());
BucketHeader* bucket = GetBucket(k.memtable_key().data(), this->compare_);
MutexLock l(&bucket->mutex_);

for (auto iter = bucket->items_.begin(); iter != bucket->items_.end();
Expand All @@ -947,7 +965,6 @@ void HashLocklessRep::Get(const LookupKey& k, void* callback_args,

MemTableRep::Iterator* HashLocklessRep::GetIterator(Arena* arena) {
bool empty_iter = (sort_vectors_cont_->items_count_.load() == 0);
if (!sort_vectors_cont_->immutable_.load()) empty_iter = true;
if (arena != nullptr) {
void* mem;
if (empty_iter) {
Expand Down
7 changes: 4 additions & 3 deletions tools/db_crashtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ def random_distribution(cuts_count):
"experimental_mempurge_threshold": lambda: 10.0*random.random(),
"max_background_compactions": 1,
"max_bytes_for_level_base": 67108864,
"memtablerep": "skip_list",
"memtablerep": lambda: random.choice(["skip_list", "speedb.HashSpdRepFactory"]),
"target_file_size_base": 16777216,
"target_file_size_multiplier": 1,
"test_batches_snapshots": 0,
Expand Down Expand Up @@ -537,8 +537,9 @@ def finalize_and_sanitize(src_params, counter):
if dest_params.get("compression_type") != "zstd":
dest_params["compression_zstd_max_train_bytes"] = 0
if dest_params.get("allow_concurrent_memtable_write", 1) == 1:
# TODO: yuval- add hash_spd memtable
dest_params["memtablerep"] = "skip_list"
dest_params["memtablerep"] = random.choice(
["skip_list", "speedb.HashSpdRepFactory"]
)
if dest_params["mmap_read"] == 1:
dest_params["use_direct_io_for_flush_and_compaction"] = 0
dest_params["use_direct_reads"] = 0
Expand Down

0 comments on commit 1fae7aa

Please sign in to comment.